Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4type TypeOfSourceArray<T extends readonly [...any[]]> = T extends [infer Head, ...infer Tail] 5 ? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>] 6 : []; 7 8export function zip<Sources extends readonly [...Source<any>[]]>( 9 sources: [...Sources] 10): Source<TypeOfSourceArray<Sources>>; 11 12export function zip<Sources extends { [prop: string]: Source<any> }>( 13 sources: Sources 14): Source<{ [Property in keyof Sources]: TypeOfSource<Sources[Property]> }>; 15 16export function zip<T>( 17 sources: Source<T>[] | Record<string, Source<T>> 18): Source<T[] | Record<string, T>> { 19 const size = Object.keys(sources).length; 20 return sink => { 21 const filled: Set<string | number> = new Set(); 22 23 const talkbacks: TalkbackFn[] | Record<string, TalkbackFn | void> = Array.isArray(sources) 24 ? new Array(size).fill(talkbackPlaceholder) 25 : {}; 26 const buffer: T[] | Record<string, T> = Array.isArray(sources) ? new Array(size) : {}; 27 28 let gotBuffer = false; 29 let gotSignal = false; 30 let ended = false; 31 let endCount = 0; 32 33 for (const key in sources) { 34 (sources[key] as Source<T>)(signal => { 35 if (signal === SignalKind.End) { 36 if (endCount >= size - 1) { 37 ended = true; 38 sink(SignalKind.End); 39 } else { 40 endCount++; 41 } 42 } else if (signal.tag === SignalKind.Start) { 43 talkbacks[key] = signal[0]; 44 } else if (!ended) { 45 buffer[key] = signal[0]; 46 filled.add(key); 47 if (!gotBuffer && filled.size < size) { 48 if (!gotSignal) { 49 for (const key in sources) 50 if (!filled.has(key)) (talkbacks[key] || talkbackPlaceholder)(TalkbackKind.Pull); 51 } else { 52 gotSignal = false; 53 } 54 } else { 55 gotBuffer = true; 56 gotSignal = false; 57 sink(push(Array.isArray(buffer) ? buffer.slice() : { ...buffer })); 58 } 59 } 60 }); 61 } 62 sink( 63 start(signal => { 64 if (ended) { 65 /*noop*/ 66 } else if (signal === TalkbackKind.Close) { 67 ended = true; 68 for (const key in talkbacks) talkbacks[key](TalkbackKind.Close); 69 } else if (!gotSignal) { 70 gotSignal = true; 71 for (const key in talkbacks) talkbacks[key](TalkbackKind.Pull); 72 } 73 }) 74 ); 75 }; 76} 77 78export function combine<Sources extends Source<any>[]>( 79 ...sources: Sources 80): Source<TypeOfSourceArray<Sources>> { 81 return zip(sources) as Source<any>; 82}