Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; 2import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers'; 3import { share } from './operators'; 4 5export function lazy<T>(make: () => Source<T>): Source<T> { 6 return sink => make()(sink); 7} 8 9export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> { 10 return sink => { 11 const iterator = iterable[Symbol.asyncIterator](); 12 let ended = false; 13 let looping = false; 14 let pulled = false; 15 let next: IteratorResult<T>; 16 sink( 17 start(async signal => { 18 if (signal === TalkbackKind.Close) { 19 ended = true; 20 if (iterator.return) iterator.return(); 21 } else if (looping) { 22 pulled = true; 23 } else { 24 for (pulled = looping = true; pulled && !ended; ) { 25 if ((next = await iterator.next()).done) { 26 ended = true; 27 if (iterator.return) await iterator.return(); 28 sink(SignalKind.End); 29 } else { 30 try { 31 pulled = false; 32 sink(push(next.value)); 33 } catch (error) { 34 if (iterator.throw) { 35 if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End); 36 } else { 37 throw error; 38 } 39 } 40 } 41 } 42 looping = false; 43 } 44 }) 45 ); 46 }; 47} 48 49export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> { 50 if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>); 51 return sink => { 52 const iterator = iterable[Symbol.iterator](); 53 let ended = false; 54 let looping = false; 55 let pulled = false; 56 let next: IteratorResult<T>; 57 sink( 58 start(signal => { 59 if (signal === TalkbackKind.Close) { 60 ended = true; 61 if (iterator.return) iterator.return(); 62 } else if (looping) { 63 pulled = true; 64 } else { 65 for (pulled = looping = true; pulled && !ended; ) { 66 if ((next = iterator.next()).done) { 67 ended = true; 68 if (iterator.return) iterator.return(); 69 sink(SignalKind.End); 70 } else { 71 try { 72 pulled = false; 73 sink(push(next.value)); 74 } catch (error) { 75 if (iterator.throw) { 76 if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End); 77 } else { 78 throw error; 79 } 80 } 81 } 82 } 83 looping = false; 84 } 85 }) 86 ); 87 }; 88} 89 90export const fromArray: <T>(array: T[]) => Source<T> = fromIterable; 91 92export function fromValue<T>(value: T): Source<T> { 93 return sink => { 94 let ended = false; 95 sink( 96 start(signal => { 97 if (signal === TalkbackKind.Close) { 98 ended = true; 99 } else if (!ended) { 100 ended = true; 101 sink(push(value)); 102 sink(SignalKind.End); 103 } 104 }) 105 ); 106 }; 107} 108 109export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> { 110 return sink => { 111 let ended = false; 112 const teardown = produce({ 113 next(value: T) { 114 if (!ended) sink(push(value)); 115 }, 116 complete() { 117 if (!ended) { 118 ended = true; 119 sink(SignalKind.End); 120 } 121 }, 122 }); 123 sink( 124 start(signal => { 125 if (signal === TalkbackKind.Close && !ended) { 126 ended = true; 127 teardown(); 128 } 129 }) 130 ); 131 }; 132} 133 134export function makeSubject<T>(): Subject<T> { 135 let next: Subject<T>['next'] | void; 136 let complete: Subject<T>['complete'] | void; 137 return { 138 source: share( 139 make(observer => { 140 next = observer.next; 141 complete = observer.complete; 142 return teardownPlaceholder; 143 }) 144 ), 145 next(value: T) { 146 if (next) next(value); 147 }, 148 complete() { 149 if (complete) complete(); 150 }, 151 }; 152} 153 154export const empty: Source<any> = (sink: Sink<any>): void => { 155 let ended = false; 156 sink( 157 start(signal => { 158 if (signal === TalkbackKind.Close) { 159 ended = true; 160 } else if (!ended) { 161 ended = true; 162 sink(SignalKind.End); 163 } 164 }) 165 ); 166}; 167 168export const never: Source<any> = (sink: Sink<any>): void => { 169 sink(start(talkbackPlaceholder)); 170}; 171 172export function interval(ms: number): Source<number> { 173 return make(observer => { 174 let i = 0; 175 const id = setInterval(() => observer.next(i++), ms); 176 return () => clearInterval(id); 177 }); 178} 179 180export function fromDomEvent(element: HTMLElement, event: string): Source<Event> { 181 return make(observer => { 182 element.addEventListener(event, observer.next); 183 return () => element.removeEventListener(event, observer.next); 184 }); 185} 186 187export function fromPromise<T>(promise: Promise<T>): Source<T> { 188 return make(observer => { 189 promise.then(value => { 190 Promise.resolve(value).then(() => { 191 observer.next(value); 192 observer.complete(); 193 }); 194 }); 195 return teardownPlaceholder; 196 }); 197}