import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types'; import { push, start, talkbackPlaceholder } from './helpers'; type TypeOfSourceArray = T extends [infer Head, ...infer Tail] ? [TypeOfSource, ...TypeOfSourceArray] : []; export function zip[]]>( sources: [...Sources] ): Source>; export function zip }>( sources: Sources ): Source<{ [Property in keyof Sources]: TypeOfSource }>; export function zip( sources: Source[] | Record> ): Source> { const size = Object.keys(sources).length; return sink => { const filled: Set = new Set(); const talkbacks: TalkbackFn[] | Record = Array.isArray(sources) ? new Array(size).fill(talkbackPlaceholder) : {}; const buffer: T[] | Record = Array.isArray(sources) ? new Array(size) : {}; let gotBuffer = false; let gotSignal = false; let ended = false; let endCount = 0; for (const key in sources) { (sources[key] as Source)(signal => { if (signal === SignalKind.End) { if (endCount >= size - 1) { ended = true; sink(SignalKind.End); } else { endCount++; } } else if (signal.tag === SignalKind.Start) { talkbacks[key] = signal[0]; } else if (!ended) { buffer[key] = signal[0]; filled.add(key); if (!gotBuffer && filled.size < size) { if (!gotSignal) { for (const key in sources) if (!filled.has(key)) (talkbacks[key] || talkbackPlaceholder)(TalkbackKind.Pull); } else { gotSignal = false; } } else { gotBuffer = true; gotSignal = false; sink(push(Array.isArray(buffer) ? buffer.slice() : { ...buffer })); } } }); } sink( start(signal => { if (ended) { /*noop*/ } else if (signal === TalkbackKind.Close) { ended = true; for (const key in talkbacks) talkbacks[key](TalkbackKind.Close); } else if (!gotSignal) { gotSignal = true; for (const key in talkbacks) talkbacks[key](TalkbackKind.Pull); } }) ); }; } export function combine[]>( ...sources: Sources ): Source> { return zip(sources) as Source; }