Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.0.0 3.8 kB view raw
1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4export function fromArray<T>(array: T[]): Source<T> { 5 return sink => { 6 let ended = false; 7 let looping = false; 8 let pulled = false; 9 let current = 0; 10 sink( 11 start(signal => { 12 if (signal === TalkbackKind.Close) { 13 ended = true; 14 } else if (looping) { 15 pulled = true; 16 } else { 17 for (pulled = looping = true; pulled && !ended; current++) { 18 if (current < array.length) { 19 pulled = false; 20 sink(push(array[current])); 21 } else { 22 ended = true; 23 sink(SignalKind.End); 24 } 25 } 26 looping = false; 27 } 28 }) 29 ); 30 }; 31} 32 33export function fromValue<T>(value: T): Source<T> { 34 return sink => { 35 let ended = false; 36 sink( 37 start(signal => { 38 if (signal === TalkbackKind.Close) { 39 ended = true; 40 } else if (!ended) { 41 ended = true; 42 sink(push(value)); 43 sink(SignalKind.End); 44 } 45 }) 46 ); 47 }; 48} 49 50export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> { 51 return sink => { 52 let ended = false; 53 const teardown = produce({ 54 next(value: T) { 55 if (!ended) sink(push(value)); 56 }, 57 complete() { 58 if (!ended) { 59 ended = true; 60 sink(SignalKind.End); 61 } 62 }, 63 }); 64 sink( 65 start(signal => { 66 if (signal === TalkbackKind.Close && !ended) { 67 ended = true; 68 teardown(); 69 } 70 }) 71 ); 72 }; 73} 74 75export function makeSubject<T>(): Subject<T> { 76 let sinks: Sink<T>[] = []; 77 let ended = false; 78 return { 79 source(sink: Sink<T>) { 80 sinks.push(sink); 81 sink( 82 start(signal => { 83 if (signal === TalkbackKind.Close) { 84 const index = sinks.indexOf(sink); 85 if (index > -1) (sinks = sinks.slice()).splice(index, 1); 86 } 87 }) 88 ); 89 }, 90 next(value: T) { 91 if (!ended) { 92 const signal = push(value); 93 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal); 94 } 95 }, 96 complete() { 97 if (!ended) { 98 ended = true; 99 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End); 100 } 101 }, 102 }; 103} 104 105export const empty: Source<any> = (sink: Sink<any>): void => { 106 let ended = false; 107 sink( 108 start(signal => { 109 if (signal === TalkbackKind.Close) { 110 ended = true; 111 } else if (!ended) { 112 ended = true; 113 sink(SignalKind.End); 114 } 115 }) 116 ); 117}; 118 119export const never: Source<any> = (sink: Sink<any>): void => { 120 sink(start(talkbackPlaceholder)); 121}; 122 123export function interval(ms: number): Source<number> { 124 return sink => { 125 let i = 0; 126 const id = setInterval(() => { 127 sink(push(i++)); 128 }, ms); 129 sink( 130 start(signal => { 131 if (signal === TalkbackKind.Close) clearInterval(id); 132 }) 133 ); 134 }; 135} 136 137export function fromDomEvent(element: HTMLElement, event: string): Source<Event> { 138 return sink => { 139 const handler = (payload: Event) => { 140 sink(push(payload)); 141 }; 142 sink( 143 start(signal => { 144 if (signal === TalkbackKind.Close) element.removeEventListener(event, handler); 145 }) 146 ); 147 element.addEventListener(event, handler); 148 }; 149} 150 151export function fromPromise<T>(promise: Promise<T>): Source<T> { 152 return sink => { 153 let ended = false; 154 promise.then(value => { 155 if (!ended) { 156 sink(push(value)); 157 sink(SignalKind.End); 158 } 159 }); 160 sink( 161 start(signal => { 162 if (signal === TalkbackKind.Close) ended = true; 163 }) 164 ); 165 }; 166}