Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; 2import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers'; 3import { share } from './operators'; 4 5export function fromArray<T>(array: T[]): Source<T> { 6 return sink => { 7 let ended = false; 8 let looping = false; 9 let pulled = false; 10 let current = 0; 11 sink( 12 start(signal => { 13 if (signal === TalkbackKind.Close) { 14 ended = true; 15 } else if (looping) { 16 pulled = true; 17 } else { 18 for (pulled = looping = true; pulled && !ended; current++) { 19 if (current < array.length) { 20 pulled = false; 21 sink(push(array[current])); 22 } else { 23 ended = true; 24 sink(SignalKind.End); 25 } 26 } 27 looping = false; 28 } 29 }) 30 ); 31 }; 32} 33 34export function fromValue<T>(value: T): Source<T> { 35 return sink => { 36 let ended = false; 37 sink( 38 start(signal => { 39 if (signal === TalkbackKind.Close) { 40 ended = true; 41 } else if (!ended) { 42 ended = true; 43 sink(push(value)); 44 sink(SignalKind.End); 45 } 46 }) 47 ); 48 }; 49} 50 51export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> { 52 return sink => { 53 let ended = false; 54 const teardown = produce({ 55 next(value: T) { 56 if (!ended) sink(push(value)); 57 }, 58 complete() { 59 if (!ended) { 60 ended = true; 61 sink(SignalKind.End); 62 } 63 }, 64 }); 65 sink( 66 start(signal => { 67 if (signal === TalkbackKind.Close && !ended) { 68 ended = true; 69 teardown(); 70 } 71 }) 72 ); 73 }; 74} 75 76export function makeSubject<T>(): Subject<T> { 77 let next: Subject<T>['next'] | void; 78 let complete: Subject<T>['complete'] | void; 79 return { 80 source: share( 81 make(observer => { 82 next = observer.next; 83 complete = observer.complete; 84 return teardownPlaceholder; 85 }) 86 ), 87 next(value: T) { 88 if (next) next(value); 89 }, 90 complete() { 91 if (complete) complete(); 92 }, 93 }; 94} 95 96export const empty: Source<any> = (sink: Sink<any>): void => { 97 let ended = false; 98 sink( 99 start(signal => { 100 if (signal === TalkbackKind.Close) { 101 ended = true; 102 } else if (!ended) { 103 ended = true; 104 sink(SignalKind.End); 105 } 106 }) 107 ); 108}; 109 110export const never: Source<any> = (sink: Sink<any>): void => { 111 sink(start(talkbackPlaceholder)); 112}; 113 114export function interval(ms: number): Source<number> { 115 return make(observer => { 116 let i = 0; 117 const id = setInterval(() => observer.next(i++), ms); 118 return () => clearInterval(id); 119 }); 120} 121 122export function fromDomEvent(element: HTMLElement, event: string): Source<Event> { 123 return make(observer => { 124 element.addEventListener(event, observer.next); 125 return () => element.removeEventListener(event, observer.next); 126 }); 127} 128 129export function fromPromise<T>(promise: Promise<T>): Source<T> { 130 return make(observer => { 131 promise.then(value => { 132 Promise.resolve(value).then(() => { 133 observer.next(value); 134 observer.complete(); 135 }); 136 }); 137 return teardownPlaceholder; 138 }); 139}