1import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4type TypeOfSourceArray<T extends readonly [...any[]]> = T extends [infer Head, ...infer Tail]
5 ? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>]
6 : [];
7
8export function zip<Sources extends readonly [...Source<any>[]]>(
9 sources: [...Sources]
10): Source<TypeOfSourceArray<Sources>>;
11
12export function zip<Sources extends { [prop: string]: Source<any> }>(
13 sources: Sources
14): Source<{ [Property in keyof Sources]: TypeOfSource<Sources[Property]> }>;
15
16export function zip<T>(
17 sources: Source<T>[] | Record<string, Source<T>>
18): Source<T[] | Record<string, T>> {
19 const size = Object.keys(sources).length;
20 return sink => {
21 const filled: Set<string | number> = new Set();
22
23 const talkbacks: TalkbackFn[] | Record<string, TalkbackFn | void> = Array.isArray(sources)
24 ? new Array(size).fill(talkbackPlaceholder)
25 : {};
26 const buffer: T[] | Record<string, T> = Array.isArray(sources) ? new Array(size) : {};
27
28 let gotBuffer = false;
29 let gotSignal = false;
30 let ended = false;
31 let endCount = 0;
32
33 for (const key in sources) {
34 (sources[key] as Source<T>)(signal => {
35 if (signal === SignalKind.End) {
36 if (endCount >= size - 1) {
37 ended = true;
38 sink(SignalKind.End);
39 } else {
40 endCount++;
41 }
42 } else if (signal.tag === SignalKind.Start) {
43 talkbacks[key] = signal[0];
44 } else if (!ended) {
45 buffer[key] = signal[0];
46 filled.add(key);
47 if (!gotBuffer && filled.size < size) {
48 if (!gotSignal) {
49 for (const key in sources)
50 if (!filled.has(key)) (talkbacks[key] || talkbackPlaceholder)(TalkbackKind.Pull);
51 } else {
52 gotSignal = false;
53 }
54 } else {
55 gotBuffer = true;
56 gotSignal = false;
57 sink(push(Array.isArray(buffer) ? buffer.slice() : { ...buffer }));
58 }
59 }
60 });
61 }
62 sink(
63 start(signal => {
64 if (ended) {
65 /*noop*/
66 } else if (signal === TalkbackKind.Close) {
67 ended = true;
68 for (const key in talkbacks) talkbacks[key](TalkbackKind.Close);
69 } else if (!gotSignal) {
70 gotSignal = true;
71 for (const key in talkbacks) talkbacks[key](TalkbackKind.Pull);
72 }
73 })
74 );
75 };
76}
77
78export function combine<Sources extends Source<any>[]>(
79 ...sources: Sources
80): Source<TypeOfSourceArray<Sources>> {
81 return zip(sources) as Source<any>;
82}