Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4type TypeOfSourceArray<T extends readonly [...any[]]> = T extends [infer Head, ...infer Tail] 5 ? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>] 6 : []; 7 8/** Combines the latest values of several sources into a Source issuing either tuple or dictionary 9 * values. 10 * 11 * @param sources - Either an array or dictionary object of Sources. 12 * @returns A {@link Source} issuing a zipped value whenever any input Source updates. 13 * 14 * @remarks 15 * `zip` combines several {@link Source | Sources}. The resulting Source will issue its first value 16 * once all input Sources have at least issued one value, and will subsequently issue a new value 17 * each time any of the Sources emits a new value. 18 * 19 * Depending on whether an array or dictionary object of Sources is passed to `zip`, its emitted 20 * values will be arrays or dictionary objects of the Sources' values. 21 * 22 * @example 23 * An example of passing a dictionary object to `zip`. If an array is passed, the resulting 24 * values will output arrays of the sources' values instead. 25 * 26 * ```ts 27 * pipe( 28 * zip({ 29 * x: fromValue(1), 30 * y: fromArray([2, 3]), 31 * }), 32 * subscribe(result => { 33 * // logs { x: 1, y: 2 } then { x: 1, y: 3 } 34 * console.log(result); 35 * }) 36 * ); 37 * ``` 38 */ 39interface zip { 40 <Sources extends readonly [...Source<any>[]]>(sources: [...Sources]): Source< 41 TypeOfSourceArray<Sources> 42 >; 43 44 <Sources extends { [prop: string]: Source<any> }>(sources: Sources): Source<{ 45 [Property in keyof Sources]: TypeOfSource<Sources[Property]>; 46 }>; 47} 48 49function zip<T>(sources: Source<T>[] | Record<string, Source<T>>): Source<T[] | Record<string, T>> { 50 const size = Object.keys(sources).length; 51 return sink => { 52 const filled: Set<string | number> = new Set(); 53 54 const talkbacks: TalkbackFn[] | Record<string, TalkbackFn | void> = Array.isArray(sources) 55 ? new Array(size).fill(talkbackPlaceholder) 56 : {}; 57 const buffer: T[] | Record<string, T> = Array.isArray(sources) ? new Array(size) : {}; 58 59 let gotBuffer = false; 60 let gotSignal = false; 61 let ended = false; 62 let endCount = 0; 63 64 for (const key in sources) { 65 (sources[key] as Source<T>)(signal => { 66 if (signal === SignalKind.End) { 67 if (endCount >= size - 1) { 68 ended = true; 69 sink(SignalKind.End); 70 } else { 71 endCount++; 72 } 73 } else if (signal.tag === SignalKind.Start) { 74 talkbacks[key] = signal[0]; 75 } else if (!ended) { 76 buffer[key] = signal[0]; 77 filled.add(key); 78 if (!gotBuffer && filled.size < size) { 79 if (!gotSignal) { 80 for (const key in sources) 81 if (!filled.has(key)) (talkbacks[key] || talkbackPlaceholder)(TalkbackKind.Pull); 82 } else { 83 gotSignal = false; 84 } 85 } else { 86 gotBuffer = true; 87 gotSignal = false; 88 sink(push(Array.isArray(buffer) ? buffer.slice() : { ...buffer })); 89 } 90 } 91 }); 92 } 93 sink( 94 start(signal => { 95 if (ended) { 96 /*noop*/ 97 } else if (signal === TalkbackKind.Close) { 98 ended = true; 99 for (const key in talkbacks) talkbacks[key](TalkbackKind.Close); 100 } else if (!gotSignal) { 101 gotSignal = true; 102 for (const key in talkbacks) talkbacks[key](TalkbackKind.Pull); 103 } 104 }) 105 ); 106 }; 107} 108 109export { zip }; 110 111/** Combines the latest values of all passed sources into a Source issuing tuple values. 112 * 113 * @see {@link zip | `zip`} which this helper wraps and uses. 114 * @param sources - A variadic list of {@link Source} parameters. 115 * @returns A {@link Source} issuing a zipped value whenever any input Source updates. 116 * 117 * @remarks 118 * `combine` takes one or more {@link Source | Sources} as arguments. Once all input Sources have at 119 * least issued one value it will issue an array of all of the Sources' values. Subsequently, it 120 * will issue a new array value whenever any of the Sources update. 121 * 122 * @example 123 * 124 * ```ts 125 * pipe( 126 * combine(fromValue(1), fromValue(2)), 127 * subscribe(result => { 128 * console.log(result); // logs [1, 2] 129 * }) 130 * ); 131 * ``` 132 */ 133export function combine<Sources extends Source<any>[]>( 134 ...sources: Sources 135): Source<TypeOfSourceArray<Sources>> { 136 return zip(sources) as Source<any>; 137}