1import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4type TypeOfSourceArray<T extends readonly [...any[]]> = T extends [infer Head, ...infer Tail]
5 ? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>]
6 : [];
7
8/** Combines the latest values of several sources into a Source issuing either tuple or dictionary
9 * values.
10 *
11 * @param sources - Either an array or dictionary object of Sources.
12 * @returns A {@link Source} issuing a zipped value whenever any input Source updates.
13 *
14 * @remarks
15 * `zip` combines several {@link Source | Sources}. The resulting Source will issue its first value
16 * once all input Sources have at least issued one value, and will subsequently issue a new value
17 * each time any of the Sources emits a new value.
18 *
19 * Depending on whether an array or dictionary object of Sources is passed to `zip`, its emitted
20 * values will be arrays or dictionary objects of the Sources' values.
21 *
22 * @example
23 * An example of passing a dictionary object to `zip`. If an array is passed, the resulting
24 * values will output arrays of the sources' values instead.
25 *
26 * ```ts
27 * pipe(
28 * zip({
29 * x: fromValue(1),
30 * y: fromArray([2, 3]),
31 * }),
32 * subscribe(result => {
33 * // logs { x: 1, y: 2 } then { x: 1, y: 3 }
34 * console.log(result);
35 * })
36 * );
37 * ```
38 */
39interface zip {
40 <Sources extends readonly [...Source<any>[]]>(sources: [...Sources]): Source<
41 TypeOfSourceArray<Sources>
42 >;
43
44 <Sources extends { [prop: string]: Source<any> }>(sources: Sources): Source<{
45 [Property in keyof Sources]: TypeOfSource<Sources[Property]>;
46 }>;
47}
48
49function zip<T>(sources: Source<T>[] | Record<string, Source<T>>): Source<T[] | Record<string, T>> {
50 const size = Object.keys(sources).length;
51 return sink => {
52 const filled: Set<string | number> = new Set();
53
54 const talkbacks: TalkbackFn[] | Record<string, TalkbackFn | void> = Array.isArray(sources)
55 ? new Array(size).fill(talkbackPlaceholder)
56 : {};
57 const buffer: T[] | Record<string, T> = Array.isArray(sources) ? new Array(size) : {};
58
59 let gotBuffer = false;
60 let gotSignal = false;
61 let ended = false;
62 let endCount = 0;
63
64 for (const key in sources) {
65 (sources[key] as Source<T>)(signal => {
66 if (signal === SignalKind.End) {
67 if (endCount >= size - 1) {
68 ended = true;
69 sink(SignalKind.End);
70 } else {
71 endCount++;
72 }
73 } else if (signal.tag === SignalKind.Start) {
74 talkbacks[key] = signal[0];
75 } else if (!ended) {
76 buffer[key] = signal[0];
77 filled.add(key);
78 if (!gotBuffer && filled.size < size) {
79 if (!gotSignal) {
80 for (const key in sources)
81 if (!filled.has(key)) (talkbacks[key] || talkbackPlaceholder)(TalkbackKind.Pull);
82 } else {
83 gotSignal = false;
84 }
85 } else {
86 gotBuffer = true;
87 gotSignal = false;
88 sink(push(Array.isArray(buffer) ? buffer.slice() : { ...buffer }));
89 }
90 }
91 });
92 }
93 sink(
94 start(signal => {
95 if (ended) {
96 /*noop*/
97 } else if (signal === TalkbackKind.Close) {
98 ended = true;
99 for (const key in talkbacks) talkbacks[key](TalkbackKind.Close);
100 } else if (!gotSignal) {
101 gotSignal = true;
102 for (const key in talkbacks) talkbacks[key](TalkbackKind.Pull);
103 }
104 })
105 );
106 };
107}
108
109export { zip };
110
111/** Combines the latest values of all passed sources into a Source issuing tuple values.
112 *
113 * @see {@link zip | `zip`} which this helper wraps and uses.
114 * @param sources - A variadic list of {@link Source} parameters.
115 * @returns A {@link Source} issuing a zipped value whenever any input Source updates.
116 *
117 * @remarks
118 * `combine` takes one or more {@link Source | Sources} as arguments. Once all input Sources have at
119 * least issued one value it will issue an array of all of the Sources' values. Subsequently, it
120 * will issue a new array value whenever any of the Sources update.
121 *
122 * @example
123 *
124 * ```ts
125 * pipe(
126 * combine(fromValue(1), fromValue(2)),
127 * subscribe(result => {
128 * console.log(result); // logs [1, 2]
129 * })
130 * );
131 * ```
132 */
133export function combine<Sources extends Source<any>[]>(
134 ...sources: Sources
135): Source<TypeOfSourceArray<Sources>> {
136 return zip(sources) as Source<any>;
137}