1import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
2import { push, start } from '../helpers';
3
4/* This tests a noop operator for passive Pull talkback signals.
5 A Pull will be sent from the sink upwards and should pass through
6 the operator until the source receives it, which then pushes a
7 value down. */
8export const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => {
9 it('responds to Pull talkback signals (spec)', () => {
10 let talkback: TalkbackFn | null = null;
11 let pushes = 0;
12 const values: any[] = [];
13
14 const source: Source<any> = sink => {
15 sink(
16 start(signal => {
17 if (!pushes && signal === TalkbackKind.Pull) {
18 pushes++;
19 sink(push(0));
20 }
21 })
22 );
23 };
24
25 const sink: Sink<any> = signal => {
26 expect(signal).not.toBe(SignalKind.End);
27 if (signal === SignalKind.End) {
28 /*noop*/
29 } else if (signal.tag === SignalKind.Push) {
30 values.push(signal[0]);
31 } else {
32 talkback = signal[0];
33 }
34 };
35
36 operator(source)(sink);
37 // The Start signal should always come in immediately
38 expect(talkback).not.toBe(null);
39 // No Push signals should be issued initially
40 expect(values).toEqual([]);
41
42 // When pulling a value we expect an immediate response
43 talkback!(TalkbackKind.Pull);
44 jest.runAllTimers();
45 expect(values).toEqual([output]);
46 });
47};
48
49/* This tests a noop operator for regular, active Push signals.
50 A Push will be sent downwards from the source, through the
51 operator to the sink. Pull events should be let through from
52 the sink after every Push event. */
53export const passesActivePush = (operator: Operator<any, any>, result: any = 0) => {
54 it('responds to eager Push signals (spec)', () => {
55 const values: any[] = [];
56 let talkback: TalkbackFn | null = null;
57 let sink: Sink<any> | null = null;
58 let pulls = 0;
59
60 const source: Source<any> = _sink => {
61 (sink = _sink)(
62 start(signal => {
63 if (signal === TalkbackKind.Pull) pulls++;
64 })
65 );
66 };
67
68 operator(source)(signal => {
69 expect(signal).not.toBe(SignalKind.End);
70 if (signal === SignalKind.End) {
71 /*noop*/
72 } else if (signal.tag === SignalKind.Start) {
73 talkback = signal[0];
74 } else if (signal.tag === SignalKind.Push) {
75 values.push(signal[0]);
76 talkback!(TalkbackKind.Pull);
77 }
78 });
79
80 // No Pull signals should be issued initially
81 expect(pulls).toBe(0);
82
83 // When pushing a value we expect an immediate response
84 sink!(push(0));
85 jest.runAllTimers();
86 expect(values).toEqual([result]);
87 // Subsequently the Pull signal should have travelled upwards
88 expect(pulls).toBe(1);
89 });
90};
91
92/* This tests a noop operator for Close talkback signals from the sink.
93 A Close signal will be sent, which should be forwarded to the source,
94 which then ends the communication without sending an End signal. */
95export const passesSinkClose = (operator: Operator<any, any>) => {
96 it('responds to Close signals from sink (spec)', () => {
97 let talkback: TalkbackFn | null = null;
98 let closing = 0;
99
100 const source: Source<any> = sink => {
101 sink(
102 start(signal => {
103 if (signal === TalkbackKind.Pull && !closing) {
104 sink(push(0));
105 } else if (signal === TalkbackKind.Close) {
106 closing++;
107 }
108 })
109 );
110 };
111
112 const sink: Sink<any> = signal => {
113 expect(signal).not.toBe(SignalKind.End);
114 if (signal === SignalKind.End) {
115 /*noop*/
116 } else if (signal.tag === SignalKind.Push) {
117 talkback!(TalkbackKind.Close);
118 } else {
119 talkback = signal[0];
120 }
121 };
122
123 operator(source)(sink);
124
125 // When pushing a value we expect an immediate close signal
126 talkback!(TalkbackKind.Pull);
127 jest.runAllTimers();
128 expect(closing).toBe(1);
129 });
130};
131
132/* This tests a noop operator for End signals from the source.
133 A Push and End signal will be sent after the first Pull talkback
134 signal from the sink, which shouldn't lead to any extra Close or Pull
135 talkback signals. */
136export const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => {
137 it('passes on immediate Push then End signals from source (spec)', () => {
138 const signals: Signal<any>[] = [];
139 let talkback: TalkbackFn | null = null;
140 let pulls = 0;
141 let ending = 0;
142
143 const source: Source<any> = sink => {
144 sink(
145 start(signal => {
146 expect(signal).not.toBe(TalkbackKind.Close);
147 if (signal === TalkbackKind.Pull) {
148 pulls++;
149 if (pulls === 1) {
150 sink(push(0));
151 sink(SignalKind.End);
152 }
153 }
154 })
155 );
156 };
157
158 const sink: Sink<any> = signal => {
159 if (signal === SignalKind.End) {
160 signals.push(signal);
161 ending++;
162 } else if (signal.tag === SignalKind.Push) {
163 signals.push(signal);
164 } else {
165 talkback = signal[0];
166 }
167 };
168
169 operator(source)(sink);
170
171 // When pushing a value we expect an immediate Push then End signal
172 talkback!(TalkbackKind.Pull);
173 jest.runAllTimers();
174 expect(ending).toBe(1);
175 expect(signals).toEqual([push(result), SignalKind.End]);
176 // Also no additional pull event should be created by the operator
177 expect(pulls).toBe(1);
178 });
179};
180
181/* This tests a noop operator for End signals from the source
182 after the first pull in response to another.
183 This is similar to passesSourceEnd but more well behaved since
184 mergeMap/switchMap/concatMap are eager operators. */
185export const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => {
186 it('passes on End signals from source (spec)', () => {
187 const signals: Signal<any>[] = [];
188 let talkback: TalkbackFn | null = null;
189 let pulls = 0;
190 let ending = 0;
191
192 const source: Source<any> = sink => {
193 sink(
194 start(signal => {
195 expect(signal).not.toBe(TalkbackKind.Close);
196 if (signal === TalkbackKind.Pull) {
197 pulls++;
198 if (pulls <= 2) {
199 sink(push(0));
200 } else {
201 sink(SignalKind.End);
202 }
203 }
204 })
205 );
206 };
207
208 const sink: Sink<any> = signal => {
209 if (signal === SignalKind.End) {
210 signals.push(signal);
211 ending++;
212 } else if (signal.tag === SignalKind.Push) {
213 signals.push(signal);
214 talkback!(TalkbackKind.Pull);
215 } else {
216 talkback = signal[0];
217 }
218 };
219
220 operator(source)(sink);
221
222 // When pushing a value we expect an immediate Push then End signal
223 talkback!(TalkbackKind.Pull);
224 jest.runAllTimers();
225 expect(ending).toBe(1);
226 expect(pulls).toBe(3);
227 expect(signals).toEqual([push(result), push(result), SignalKind.End]);
228 });
229};
230
231/* This tests a noop operator for Start signals from the source.
232 When the operator's sink is started by the source it'll receive
233 a Start event. As a response it should never send more than one
234 Start signals to the sink. */
235export const passesSingleStart = (operator: Operator<any, any>) => {
236 it('sends a single Start event to the incoming sink (spec)', () => {
237 let starts = 0;
238
239 const source: Source<any> = sink => {
240 sink(start(() => {}));
241 };
242
243 const sink: Sink<any> = signal => {
244 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
245 starts++;
246 }
247 };
248
249 // When starting the operator we expect a single start event on the sink
250 operator(source)(sink);
251 expect(starts).toBe(1);
252 });
253};
254
255/* This tests a noop operator for silence after End signals from the source.
256 When the operator receives the End signal it shouldn't forward any other
257 signals to the sink anymore.
258 This isn't a strict requirement, but some operators should ensure that
259 all sources are well behaved. This is particularly true for operators
260 that either Close sources themselves or may operate on multiple sources. */
261export const passesStrictEnd = (operator: Operator<any, any>) => {
262 it('stops all signals after End has been received (spec: strict end)', () => {
263 let pulls = 0;
264 const signals: Signal<any>[] = [];
265
266 const source: Source<any> = sink => {
267 sink(
268 start(signal => {
269 if (signal === TalkbackKind.Pull) {
270 pulls++;
271 sink(SignalKind.End);
272 sink(push(123));
273 }
274 })
275 );
276 };
277
278 const sink: Sink<any> = signal => {
279 if (signal === SignalKind.End) {
280 signals.push(signal);
281 } else if (signal.tag === SignalKind.Push) {
282 signals.push(signal);
283 } else {
284 signal[0](TalkbackKind.Pull);
285 }
286 };
287
288 operator(source)(sink);
289
290 // The Push signal should've been dropped
291 jest.runAllTimers();
292 expect(signals).toEqual([SignalKind.End]);
293 expect(pulls).toBe(1);
294 });
295
296 it('stops all signals after Close has been received (spec: strict close)', () => {
297 const signals: Signal<any>[] = [];
298
299 const source: Source<any> = sink => {
300 sink(
301 start(signal => {
302 if (signal === TalkbackKind.Close) {
303 sink(push(123));
304 }
305 })
306 );
307 };
308
309 const sink: Sink<any> = signal => {
310 if (signal === SignalKind.End) {
311 signals.push(signal);
312 } else if (signal.tag === SignalKind.Push) {
313 signals.push(signal);
314 } else {
315 signal[0](TalkbackKind.Close);
316 }
317 };
318
319 operator(source)(sink);
320
321 // The Push signal should've been dropped
322 jest.runAllTimers();
323 expect(signals).toEqual([]);
324 });
325};
326
327/* This tests an immediately closing operator for End signals to
328 the sink and Close signals to the source.
329 When an operator closes immediately we expect to see a Close
330 signal at the source and an End signal to the sink, since the
331 closing operator is expected to end the entire chain. */
332export const passesCloseAndEnd = (closingOperator: Operator<any, any>) => {
333 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
334 let closing = 0;
335 let ending = 0;
336
337 const source: Source<any> = sink => {
338 sink(
339 start(signal => {
340 // For some operator tests we do need to send a single value
341 if (signal === TalkbackKind.Pull) {
342 sink(push(null));
343 } else {
344 closing++;
345 }
346 })
347 );
348 };
349
350 const sink: Sink<any> = signal => {
351 if (signal === SignalKind.End) {
352 ending++;
353 } else if (signal.tag === SignalKind.Start) {
354 signal[0](TalkbackKind.Pull);
355 }
356 };
357
358 // We expect the operator to immediately end and close
359 closingOperator(source)(sink);
360 expect(closing).toBe(1);
361 expect(ending).toBe(1);
362 });
363};
364
365export const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => {
366 it('passes an async push with an async end (spec)', () => {
367 let hasPushed = false;
368 const signals: Signal<any>[] = [];
369
370 const source: Source<any> = sink => {
371 sink(
372 start(signal => {
373 if (signal === TalkbackKind.Pull && !hasPushed) {
374 hasPushed = true;
375 setTimeout(() => sink(push(0)), 10);
376 setTimeout(() => sink(SignalKind.End), 20);
377 }
378 })
379 );
380 };
381
382 const sink: Sink<any> = signal => {
383 if (signal === SignalKind.End) {
384 signals.push(signal);
385 } else if (signal.tag === SignalKind.Push) {
386 signals.push(signal);
387 } else {
388 setTimeout(() => {
389 signal[0](TalkbackKind.Pull);
390 }, 5);
391 }
392 };
393
394 // We initially expect to see the push signal
395 // Afterwards after all timers all other signals come in
396 operator(source)(sink);
397 expect(signals.length).toBe(0);
398 jest.advanceTimersByTime(5);
399 expect(hasPushed).toBeTruthy();
400 jest.runAllTimers();
401
402 expect(signals).toEqual([push(result), SignalKind.End]);
403 });
404};