1import { it, expect, vi } from 'vitest';
2
3import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
4import { push, start } from '../helpers';
5
6/* This tests a noop operator for passive Pull talkback signals.
7 A Pull will be sent from the sink upwards and should pass through
8 the operator until the source receives it, which then pushes a
9 value down. */
10export const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => {
11 it('responds to Pull talkback signals (spec)', () => {
12 let talkback: TalkbackFn | null = null;
13 let pushes = 0;
14 const values: any[] = [];
15
16 const source: Source<any> = sink => {
17 sink(
18 start(signal => {
19 if (!pushes && signal === TalkbackKind.Pull) {
20 pushes++;
21 sink(push(0));
22 }
23 })
24 );
25 };
26
27 const sink: Sink<any> = signal => {
28 expect(signal).not.toBe(SignalKind.End);
29 if (signal === SignalKind.End) {
30 /*noop*/
31 } else if (signal.tag === SignalKind.Push) {
32 values.push(signal[0]);
33 } else {
34 talkback = signal[0];
35 }
36 };
37
38 operator(source)(sink);
39 // The Start signal should always come in immediately
40 expect(talkback).not.toBe(null);
41 // No Push signals should be issued initially
42 expect(values).toEqual([]);
43
44 // When pulling a value we expect an immediate response
45 talkback!(TalkbackKind.Pull);
46 vi.runAllTimers();
47 expect(values).toEqual([output]);
48 });
49};
50
51/* This tests a noop operator for regular, active Push signals.
52 A Push will be sent downwards from the source, through the
53 operator to the sink. Pull events should be let through from
54 the sink after every Push event. */
55export const passesActivePush = (operator: Operator<any, any>, result: any = 0) => {
56 it('responds to eager Push signals (spec)', () => {
57 const values: any[] = [];
58 let talkback: TalkbackFn | null = null;
59 let sink: Sink<any> | null = null;
60 let pulls = 0;
61
62 const source: Source<any> = _sink => {
63 (sink = _sink)(
64 start(signal => {
65 if (signal === TalkbackKind.Pull) pulls++;
66 })
67 );
68 };
69
70 operator(source)(signal => {
71 expect(signal).not.toBe(SignalKind.End);
72 if (signal === SignalKind.End) {
73 /*noop*/
74 } else if (signal.tag === SignalKind.Start) {
75 talkback = signal[0];
76 } else if (signal.tag === SignalKind.Push) {
77 values.push(signal[0]);
78 talkback!(TalkbackKind.Pull);
79 }
80 });
81
82 // No Pull signals should be issued initially
83 expect(pulls).toBe(0);
84
85 // When pushing a value we expect an immediate response
86 sink!(push(0));
87 vi.runAllTimers();
88 expect(values).toEqual([result]);
89 // Subsequently the Pull signal should have travelled upwards
90 expect(pulls).toBe(1);
91 });
92};
93
94/* This tests a noop operator for Close talkback signals from the sink.
95 A Close signal will be sent, which should be forwarded to the source,
96 which then ends the communication without sending an End signal. */
97export const passesSinkClose = (operator: Operator<any, any>) => {
98 it('responds to Close signals from sink (spec)', () => {
99 let talkback: TalkbackFn | null = null;
100 let closing = 0;
101
102 const source: Source<any> = sink => {
103 sink(
104 start(signal => {
105 if (signal === TalkbackKind.Pull && !closing) {
106 sink(push(0));
107 } else if (signal === TalkbackKind.Close) {
108 closing++;
109 }
110 })
111 );
112 };
113
114 const sink: Sink<any> = signal => {
115 expect(signal).not.toBe(SignalKind.End);
116 if (signal === SignalKind.End) {
117 /*noop*/
118 } else if (signal.tag === SignalKind.Push) {
119 talkback!(TalkbackKind.Close);
120 } else {
121 talkback = signal[0];
122 }
123 };
124
125 operator(source)(sink);
126
127 // When pushing a value we expect an immediate close signal
128 talkback!(TalkbackKind.Pull);
129 vi.runAllTimers();
130 expect(closing).toBe(1);
131 });
132};
133
134/* This tests a noop operator for End signals from the source.
135 A Push and End signal will be sent after the first Pull talkback
136 signal from the sink, which shouldn't lead to any extra Close or Pull
137 talkback signals. */
138export const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => {
139 it('passes on immediate Push then End signals from source (spec)', () => {
140 const signals: Signal<any>[] = [];
141 let talkback: TalkbackFn | null = null;
142 let pulls = 0;
143 let ending = 0;
144
145 const source: Source<any> = sink => {
146 sink(
147 start(signal => {
148 expect(signal).not.toBe(TalkbackKind.Close);
149 if (signal === TalkbackKind.Pull) {
150 pulls++;
151 if (pulls === 1) {
152 sink(push(0));
153 sink(SignalKind.End);
154 }
155 }
156 })
157 );
158 };
159
160 const sink: Sink<any> = signal => {
161 if (signal === SignalKind.End) {
162 signals.push(signal);
163 ending++;
164 } else if (signal.tag === SignalKind.Push) {
165 signals.push(signal);
166 } else {
167 talkback = signal[0];
168 }
169 };
170
171 operator(source)(sink);
172
173 // When pushing a value we expect an immediate Push then End signal
174 talkback!(TalkbackKind.Pull);
175 vi.runAllTimers();
176 expect(ending).toBe(1);
177 expect(signals).toEqual([push(result), SignalKind.End]);
178 // Also no additional pull event should be created by the operator
179 expect(pulls).toBe(1);
180 });
181};
182
183/* This tests a noop operator for End signals from the source
184 after the first pull in response to another.
185 This is similar to passesSourceEnd but more well behaved since
186 mergeMap/switchMap/concatMap are eager operators. */
187export const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => {
188 it('passes on End signals from source (spec)', () => {
189 const signals: Signal<any>[] = [];
190 let talkback: TalkbackFn | null = null;
191 let pulls = 0;
192 let ending = 0;
193
194 const source: Source<any> = sink => {
195 sink(
196 start(signal => {
197 expect(signal).not.toBe(TalkbackKind.Close);
198 if (signal === TalkbackKind.Pull) {
199 pulls++;
200 if (pulls <= 2) {
201 sink(push(0));
202 } else {
203 sink(SignalKind.End);
204 }
205 }
206 })
207 );
208 };
209
210 const sink: Sink<any> = signal => {
211 if (signal === SignalKind.End) {
212 signals.push(signal);
213 ending++;
214 } else if (signal.tag === SignalKind.Push) {
215 signals.push(signal);
216 talkback!(TalkbackKind.Pull);
217 } else {
218 talkback = signal[0];
219 }
220 };
221
222 operator(source)(sink);
223
224 // When pushing a value we expect an immediate Push then End signal
225 talkback!(TalkbackKind.Pull);
226 vi.runAllTimers();
227 expect(ending).toBe(1);
228 expect(pulls).toBe(3);
229 expect(signals).toEqual([push(result), push(result), SignalKind.End]);
230 });
231};
232
233/* This tests a noop operator for Start signals from the source.
234 When the operator's sink is started by the source it'll receive
235 a Start event. As a response it should never send more than one
236 Start signals to the sink. */
237export const passesSingleStart = (operator: Operator<any, any>) => {
238 it('sends a single Start event to the incoming sink (spec)', () => {
239 let starts = 0;
240
241 const source: Source<any> = sink => {
242 sink(start(() => {}));
243 };
244
245 const sink: Sink<any> = signal => {
246 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
247 starts++;
248 }
249 };
250
251 // When starting the operator we expect a single start event on the sink
252 operator(source)(sink);
253 expect(starts).toBe(1);
254 });
255};
256
257/* This tests a noop operator for silence after End signals from the source.
258 When the operator receives the End signal it shouldn't forward any other
259 signals to the sink anymore.
260 This isn't a strict requirement, but some operators should ensure that
261 all sources are well behaved. This is particularly true for operators
262 that either Close sources themselves or may operate on multiple sources. */
263export const passesStrictEnd = (operator: Operator<any, any>) => {
264 it('stops all signals after End has been received (spec: strict end)', () => {
265 let pulls = 0;
266 const signals: Signal<any>[] = [];
267
268 const source: Source<any> = sink => {
269 sink(
270 start(signal => {
271 if (signal === TalkbackKind.Pull) {
272 pulls++;
273 sink(SignalKind.End);
274 sink(push(123));
275 }
276 })
277 );
278 };
279
280 const sink: Sink<any> = signal => {
281 if (signal === SignalKind.End) {
282 signals.push(signal);
283 } else if (signal.tag === SignalKind.Push) {
284 signals.push(signal);
285 } else {
286 signal[0](TalkbackKind.Pull);
287 }
288 };
289
290 operator(source)(sink);
291
292 // The Push signal should've been dropped
293 vi.runAllTimers();
294 expect(signals).toEqual([SignalKind.End]);
295 expect(pulls).toBe(1);
296 });
297
298 it('stops all signals after Close has been received (spec: strict close)', () => {
299 const signals: Signal<any>[] = [];
300
301 const source: Source<any> = sink => {
302 sink(
303 start(signal => {
304 if (signal === TalkbackKind.Close) {
305 sink(push(123));
306 }
307 })
308 );
309 };
310
311 const sink: Sink<any> = signal => {
312 if (signal === SignalKind.End) {
313 signals.push(signal);
314 } else if (signal.tag === SignalKind.Push) {
315 signals.push(signal);
316 } else {
317 signal[0](TalkbackKind.Close);
318 }
319 };
320
321 operator(source)(sink);
322
323 // The Push signal should've been dropped
324 vi.runAllTimers();
325 expect(signals).toEqual([]);
326 });
327};
328
329/* This tests an immediately closing operator for End signals to
330 the sink and Close signals to the source.
331 When an operator closes immediately we expect to see a Close
332 signal at the source and an End signal to the sink, since the
333 closing operator is expected to end the entire chain. */
334export const passesCloseAndEnd = (closingOperator: Operator<any, any>) => {
335 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
336 let closing = 0;
337 let ending = 0;
338
339 const source: Source<any> = sink => {
340 sink(
341 start(signal => {
342 // For some operator tests we do need to send a single value
343 if (signal === TalkbackKind.Pull) {
344 sink(push(null));
345 } else {
346 closing++;
347 }
348 })
349 );
350 };
351
352 const sink: Sink<any> = signal => {
353 if (signal === SignalKind.End) {
354 ending++;
355 } else if (signal.tag === SignalKind.Start) {
356 signal[0](TalkbackKind.Pull);
357 }
358 };
359
360 // We expect the operator to immediately end and close
361 closingOperator(source)(sink);
362 expect(closing).toBe(1);
363 expect(ending).toBe(1);
364 });
365};
366
367export const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => {
368 it('passes an async push with an async end (spec)', () => {
369 let hasPushed = false;
370 const signals: Signal<any>[] = [];
371
372 const source: Source<any> = sink => {
373 sink(
374 start(signal => {
375 if (signal === TalkbackKind.Pull && !hasPushed) {
376 hasPushed = true;
377 setTimeout(() => sink(push(0)), 10);
378 setTimeout(() => sink(SignalKind.End), 20);
379 }
380 })
381 );
382 };
383
384 const sink: Sink<any> = signal => {
385 if (signal === SignalKind.End) {
386 signals.push(signal);
387 } else if (signal.tag === SignalKind.Push) {
388 signals.push(signal);
389 } else {
390 setTimeout(() => {
391 signal[0](TalkbackKind.Pull);
392 }, 5);
393 }
394 };
395
396 // We initially expect to see the push signal
397 // Afterwards after all timers all other signals come in
398 operator(source)(sink);
399 expect(signals.length).toBe(0);
400 vi.advanceTimersByTime(5);
401 expect(hasPushed).toBeTruthy();
402 vi.runAllTimers();
403
404 expect(signals).toEqual([push(result), SignalKind.End]);
405 });
406};