Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Subscription, TalkbackKind, SignalKind } from './types'; 2import { talkbackPlaceholder } from './helpers'; 3 4export function subscribe<T>(subscriber: (value: T) => void) { 5 return (source: Source<T>): Subscription => { 6 let talkback = talkbackPlaceholder; 7 let ended = false; 8 source(signal => { 9 if (signal === SignalKind.End) { 10 ended = true; 11 } else if (signal.tag === SignalKind.Start) { 12 (talkback = signal[0])(TalkbackKind.Pull); 13 } else if (!ended) { 14 subscriber(signal[0]); 15 talkback(TalkbackKind.Pull); 16 } 17 }); 18 return { 19 unsubscribe() { 20 if (!ended) { 21 ended = true; 22 talkback(TalkbackKind.Close); 23 } 24 }, 25 }; 26 }; 27} 28 29export function forEach<T>(subscriber: (value: T) => void) { 30 return (source: Source<T>): void => { 31 subscribe(subscriber)(source); 32 }; 33} 34 35export function publish<T>(source: Source<T>): void { 36 subscribe(_value => { 37 /*noop*/ 38 })(source); 39} 40 41export function toArray<T>(source: Source<T>): T[] { 42 const values: T[] = []; 43 let talkback = talkbackPlaceholder; 44 let ended = false; 45 source(signal => { 46 if (signal === SignalKind.End) { 47 ended = true; 48 } else if (signal.tag === SignalKind.Start) { 49 (talkback = signal[0])(TalkbackKind.Pull); 50 } else { 51 values.push(signal[0]); 52 talkback(TalkbackKind.Pull); 53 } 54 }); 55 if (!ended) talkback(TalkbackKind.Close); 56 return values; 57} 58 59export function toPromise<T>(source: Source<T>): Promise<T> { 60 return new Promise(resolve => { 61 let talkback = talkbackPlaceholder; 62 let value: T | void; 63 source(signal => { 64 if (signal === SignalKind.End) { 65 resolve(value!); 66 } else if (signal.tag === SignalKind.Start) { 67 (talkback = signal[0])(TalkbackKind.Pull); 68 } else { 69 value = signal[0]; 70 talkback(TalkbackKind.Pull); 71 } 72 }); 73 }); 74}