1import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
2import { push, start } from '../helpers';
3
4import * as sources from '../sources';
5import * as sinks from '../sinks';
6import * as operators from '../operators';
7
8/* This tests a noop operator for passive Pull talkback signals.
9 A Pull will be sent from the sink upwards and should pass through
10 the operator until the source receives it, which then pushes a
11 value down. */
12const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => {
13 it('responds to Pull talkback signals (spec)', () => {
14 let talkback: TalkbackFn | null = null;
15 let pushes = 0;
16 const values: any[] = [];
17
18 const source: Source<any> = sink => {
19 sink(
20 start(signal => {
21 if (!pushes && signal === TalkbackKind.Pull) {
22 pushes++;
23 sink(push(0));
24 }
25 })
26 );
27 };
28
29 const sink: Sink<any> = signal => {
30 expect(signal).not.toBe(SignalKind.End);
31 if (signal === SignalKind.End) {
32 /*noop*/
33 } else if (signal.tag === SignalKind.Push) {
34 values.push(signal[0]);
35 } else {
36 talkback = signal[0];
37 }
38 };
39
40 operator(source)(sink);
41 // The Start signal should always come in immediately
42 expect(talkback).not.toBe(null);
43 // No Push signals should be issued initially
44 expect(values).toEqual([]);
45
46 // When pulling a value we expect an immediate response
47 talkback!(TalkbackKind.Pull);
48 jest.runAllTimers();
49 expect(values).toEqual([output]);
50 });
51};
52
53/* This tests a noop operator for regular, active Push signals.
54 A Push will be sent downwards from the source, through the
55 operator to the sink. Pull events should be let through from
56 the sink after every Push event. */
57const passesActivePush = (operator: Operator<any, any>, result: any = 0) => {
58 it('responds to eager Push signals (spec)', () => {
59 const values: any[] = [];
60 let talkback: TalkbackFn | null = null;
61 let sink: Sink<any> | null = null;
62 let pulls = 0;
63
64 const source: Source<any> = _sink => {
65 (sink = _sink)(
66 start(signal => {
67 if (signal === TalkbackKind.Pull) pulls++;
68 })
69 );
70 };
71
72 operator(source)(signal => {
73 expect(signal).not.toBe(SignalKind.End);
74 if (signal === SignalKind.End) {
75 /*noop*/
76 } else if (signal.tag === SignalKind.Start) {
77 talkback = signal[0];
78 } else if (signal.tag === SignalKind.Push) {
79 values.push(signal[0]);
80 talkback!(TalkbackKind.Pull);
81 }
82 });
83
84 // No Pull signals should be issued initially
85 expect(pulls).toBe(0);
86
87 // When pushing a value we expect an immediate response
88 sink!(push(0));
89 jest.runAllTimers();
90 expect(values).toEqual([result]);
91 // Subsequently the Pull signal should have travelled upwards
92 expect(pulls).toBe(1);
93 });
94};
95
96/* This tests a noop operator for Close talkback signals from the sink.
97 A Close signal will be sent, which should be forwarded to the source,
98 which then ends the communication without sending an End signal. */
99const passesSinkClose = (operator: Operator<any, any>) => {
100 it('responds to Close signals from sink (spec)', () => {
101 let talkback: TalkbackFn | null = null;
102 let closing = 0;
103
104 const source: Source<any> = sink => {
105 sink(
106 start(signal => {
107 if (signal === TalkbackKind.Pull && !closing) {
108 sink(push(0));
109 } else if (signal === TalkbackKind.Close) {
110 closing++;
111 }
112 })
113 );
114 };
115
116 const sink: Sink<any> = signal => {
117 expect(signal).not.toBe(SignalKind.End);
118 if (signal === SignalKind.End) {
119 /*noop*/
120 } else if (signal.tag === SignalKind.Push) {
121 talkback!(TalkbackKind.Close);
122 } else {
123 talkback = signal[0];
124 }
125 };
126
127 operator(source)(sink);
128
129 // When pushing a value we expect an immediate close signal
130 talkback!(TalkbackKind.Pull);
131 jest.runAllTimers();
132 expect(closing).toBe(1);
133 });
134};
135
136/* This tests a noop operator for End signals from the source.
137 A Push and End signal will be sent after the first Pull talkback
138 signal from the sink, which shouldn't lead to any extra Close or Pull
139 talkback signals. */
140const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => {
141 it('passes on immediate Push then End signals from source (spec)', () => {
142 const signals: Signal<any>[] = [];
143 let talkback: TalkbackFn | null = null;
144 let pulls = 0;
145 let ending = 0;
146
147 const source: Source<any> = sink => {
148 sink(
149 start(signal => {
150 expect(signal).not.toBe(TalkbackKind.Close);
151 if (signal === TalkbackKind.Pull) {
152 pulls++;
153 if (pulls === 1) {
154 sink(push(0));
155 sink(SignalKind.End);
156 }
157 }
158 })
159 );
160 };
161
162 const sink: Sink<any> = signal => {
163 if (signal === SignalKind.End) {
164 signals.push(signal);
165 ending++;
166 } else if (signal.tag === SignalKind.Push) {
167 signals.push(signal);
168 } else {
169 talkback = signal[0];
170 }
171 };
172
173 operator(source)(sink);
174
175 // When pushing a value we expect an immediate Push then End signal
176 talkback!(TalkbackKind.Pull);
177 jest.runAllTimers();
178 expect(ending).toBe(1);
179 expect(signals).toEqual([push(result), SignalKind.End]);
180 // Also no additional pull event should be created by the operator
181 expect(pulls).toBe(1);
182 });
183};
184
185/* This tests a noop operator for End signals from the source
186 after the first pull in response to another.
187 This is similar to passesSourceEnd but more well behaved since
188 mergeMap/switchMap/concatMap are eager operators. */
189const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => {
190 it('passes on End signals from source (spec)', () => {
191 const signals: Signal<any>[] = [];
192 let talkback: TalkbackFn | null = null;
193 let pulls = 0;
194 let ending = 0;
195
196 const source: Source<any> = sink => {
197 sink(
198 start(signal => {
199 expect(signal).not.toBe(TalkbackKind.Close);
200 if (signal === TalkbackKind.Pull) {
201 pulls++;
202 if (pulls <= 2) {
203 sink(push(0));
204 } else {
205 sink(SignalKind.End);
206 }
207 }
208 })
209 );
210 };
211
212 const sink: Sink<any> = signal => {
213 if (signal === SignalKind.End) {
214 signals.push(signal);
215 ending++;
216 } else if (signal.tag === SignalKind.Push) {
217 signals.push(signal);
218 talkback!(TalkbackKind.Pull);
219 } else {
220 talkback = signal[0];
221 }
222 };
223
224 operator(source)(sink);
225
226 // When pushing a value we expect an immediate Push then End signal
227 talkback!(TalkbackKind.Pull);
228 jest.runAllTimers();
229 expect(ending).toBe(1);
230 expect(pulls).toBe(3);
231 expect(signals).toEqual([push(result), push(result), SignalKind.End]);
232 });
233};
234
235/* This tests a noop operator for Start signals from the source.
236 When the operator's sink is started by the source it'll receive
237 a Start event. As a response it should never send more than one
238 Start signals to the sink. */
239const passesSingleStart = (operator: Operator<any, any>) => {
240 it('sends a single Start event to the incoming sink (spec)', () => {
241 let starts = 0;
242
243 const source: Source<any> = sink => {
244 sink(start(() => {}));
245 };
246
247 const sink: Sink<any> = signal => {
248 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
249 starts++;
250 }
251 };
252
253 // When starting the operator we expect a single start event on the sink
254 operator(source)(sink);
255 expect(starts).toBe(1);
256 });
257};
258
259/* This tests a noop operator for silence after End signals from the source.
260 When the operator receives the End signal it shouldn't forward any other
261 signals to the sink anymore.
262 This isn't a strict requirement, but some operators should ensure that
263 all sources are well behaved. This is particularly true for operators
264 that either Close sources themselves or may operate on multiple sources. */
265const passesStrictEnd = (operator: Operator<any, any>) => {
266 it('stops all signals after End has been received (spec: strict end)', () => {
267 let pulls = 0;
268 const signals: Signal<any>[] = [];
269
270 const source: Source<any> = sink => {
271 sink(
272 start(signal => {
273 if (signal === TalkbackKind.Pull) {
274 pulls++;
275 sink(SignalKind.End);
276 sink(push(123));
277 }
278 })
279 );
280 };
281
282 const sink: Sink<any> = signal => {
283 if (signal === SignalKind.End) {
284 signals.push(signal);
285 } else if (signal.tag === SignalKind.Push) {
286 signals.push(signal);
287 } else {
288 signal[0](TalkbackKind.Pull);
289 }
290 };
291
292 operator(source)(sink);
293
294 // The Push signal should've been dropped
295 jest.runAllTimers();
296 expect(signals).toEqual([SignalKind.End]);
297 expect(pulls).toBe(1);
298 });
299
300 it('stops all signals after Close has been received (spec: strict close)', () => {
301 const signals: Signal<any>[] = [];
302
303 const source: Source<any> = sink => {
304 sink(
305 start(signal => {
306 if (signal === TalkbackKind.Close) {
307 sink(push(123));
308 }
309 })
310 );
311 };
312
313 const sink: Sink<any> = signal => {
314 if (signal === SignalKind.End) {
315 signals.push(signal);
316 } else if (signal.tag === SignalKind.Push) {
317 signals.push(signal);
318 } else {
319 signal[0](TalkbackKind.Close);
320 }
321 };
322
323 operator(source)(sink);
324
325 // The Push signal should've been dropped
326 jest.runAllTimers();
327 expect(signals).toEqual([]);
328 });
329};
330
331/* This tests an immediately closing operator for End signals to
332 the sink and Close signals to the source.
333 When an operator closes immediately we expect to see a Close
334 signal at the source and an End signal to the sink, since the
335 closing operator is expected to end the entire chain. */
336const passesCloseAndEnd = (closingOperator: Operator<any, any>) => {
337 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
338 let closing = 0;
339 let ending = 0;
340
341 const source: Source<any> = sink => {
342 sink(
343 start(signal => {
344 // For some operator tests we do need to send a single value
345 if (signal === TalkbackKind.Pull) {
346 sink(push(null));
347 } else {
348 closing++;
349 }
350 })
351 );
352 };
353
354 const sink: Sink<any> = signal => {
355 if (signal === SignalKind.End) {
356 ending++;
357 } else if (signal.tag === SignalKind.Start) {
358 signal[0](TalkbackKind.Pull);
359 }
360 };
361
362 // We expect the operator to immediately end and close
363 closingOperator(source)(sink);
364 expect(closing).toBe(1);
365 expect(ending).toBe(1);
366 });
367};
368
369const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => {
370 it('passes an async push with an async end (spec)', () => {
371 let hasPushed = false;
372 const signals: Signal<any>[] = [];
373
374 const source: Source<any> = sink => {
375 sink(
376 start(signal => {
377 if (signal === TalkbackKind.Pull && !hasPushed) {
378 hasPushed = true;
379 setTimeout(() => sink(push(0)), 10);
380 setTimeout(() => sink(SignalKind.End), 20);
381 }
382 })
383 );
384 };
385
386 const sink: Sink<any> = signal => {
387 if (signal === SignalKind.End) {
388 signals.push(signal);
389 } else if (signal.tag === SignalKind.Push) {
390 signals.push(signal);
391 } else {
392 setTimeout(() => {
393 signal[0](TalkbackKind.Pull);
394 }, 5);
395 }
396 };
397
398 // We initially expect to see the push signal
399 // Afterwards after all timers all other signals come in
400 operator(source)(sink);
401 expect(signals.length).toBe(0);
402 jest.advanceTimersByTime(5);
403 expect(hasPushed).toBeTruthy();
404 jest.runAllTimers();
405
406 expect(signals).toEqual([push(result), SignalKind.End]);
407 });
408};
409
410beforeEach(() => {
411 jest.useFakeTimers();
412});
413
414describe('combine', () => {
415 const noop = (source: Source<any>) => operators.combine(sources.fromValue(0), source);
416
417 passesPassivePull(noop, [0, 0]);
418 passesActivePush(noop, [0, 0]);
419 passesSinkClose(noop);
420 passesSourceEnd(noop, [0, 0]);
421 passesSingleStart(noop);
422 passesStrictEnd(noop);
423
424 it('emits the zipped values of two sources', () => {
425 const { source: sourceA, next: nextA } = sources.makeSubject();
426 const { source: sourceB, next: nextB } = sources.makeSubject();
427 const fn = jest.fn();
428
429 sinks.forEach(fn)(operators.combine(sourceA, sourceB));
430
431 nextA(1);
432 expect(fn).not.toHaveBeenCalled();
433 nextB(2);
434 expect(fn).toHaveBeenCalledWith([1, 2]);
435 });
436});
437
438describe('buffer', () => {
439 const valueThenNever: Source<any> = sink =>
440 sink(
441 start(signal => {
442 if (signal === TalkbackKind.Pull) sink(push(null));
443 })
444 );
445
446 const noop = operators.buffer(valueThenNever);
447
448 passesPassivePull(noop, [0]);
449 passesActivePush(noop, [0]);
450 passesSinkClose(noop);
451 passesSourcePushThenEnd(noop, [0]);
452 passesSingleStart(noop);
453 passesStrictEnd(noop);
454
455 it('emits batches of input values when a notifier emits', () => {
456 const { source: notifier$, next: notify } = sources.makeSubject();
457 const { source: input$, next } = sources.makeSubject();
458 const fn = jest.fn();
459
460 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
461
462 next(1);
463 next(2);
464 expect(fn).not.toHaveBeenCalled();
465
466 notify(null);
467 expect(fn).toHaveBeenCalledWith([1, 2]);
468
469 next(3);
470 notify(null);
471 expect(fn).toHaveBeenCalledWith([3]);
472 });
473});
474
475describe('concatMap', () => {
476 const noop = operators.concatMap(x => sources.fromValue(x));
477 passesPassivePull(noop);
478 passesActivePush(noop);
479 passesSinkClose(noop);
480 passesSourcePushThenEnd(noop);
481 passesSingleStart(noop);
482 passesStrictEnd(noop);
483 passesAsyncSequence(noop);
484
485 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
486 it('emits values from each flattened synchronous source', () => {
487 const { source, next, complete } = sources.makeSubject<number>();
488 const fn = jest.fn();
489
490 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
491
492 next(1);
493 next(3);
494 complete();
495
496 expect(fn).toHaveBeenCalledTimes(6);
497 expect(fn.mock.calls).toEqual([
498 [start(expect.any(Function))],
499 [push(1)],
500 [push(2)],
501 [push(3)],
502 [push(4)],
503 [SignalKind.End],
504 ]);
505 });
506
507 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
508 it('lets inner sources finish when outer source ends', () => {
509 const signals: Signal<any>[] = [];
510 const teardown = jest.fn();
511 const fn = (signal: Signal<any>) => {
512 signals.push(signal);
513 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
514 signal[0](TalkbackKind.Pull);
515 signal[0](TalkbackKind.Close);
516 }
517 };
518
519 operators.concatMap(() => {
520 return sources.make(() => teardown);
521 })(sources.fromValue(null))(fn);
522
523 expect(teardown).toHaveBeenCalled();
524 expect(signals).toEqual([start(expect.any(Function))]);
525 });
526
527 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
528 it('emits values from each flattened asynchronous source, one at a time', () => {
529 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
530 const fn = jest.fn();
531
532 sinks.forEach(fn)(
533 operators.concatMap((x: number) => {
534 return operators.delay(5)(sources.fromArray([x, x * 2]));
535 })(source)
536 );
537
538 jest.advanceTimersByTime(14);
539 expect(fn.mock.calls).toEqual([[1], [2]]);
540
541 jest.runAllTimers();
542 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]);
543 });
544
545 it('works for fully asynchronous sources', () => {
546 const fn = jest.fn();
547
548 sinks.forEach(fn)(
549 operators.concatMap(() => {
550 return sources.make(observer => {
551 setTimeout(() => observer.next(1));
552 return () => {};
553 });
554 })(sources.fromValue(null))
555 );
556
557 jest.runAllTimers();
558 expect(fn).toHaveBeenCalledWith(1);
559 });
560
561 it('emits synchronous values in order', () => {
562 const values: any[] = [];
563
564 sinks.forEach(x => values.push(x))(
565 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
566 );
567
568 expect(values).toEqual([1, 2, 3, 4]);
569 });
570});
571
572describe('debounce', () => {
573 const noop = operators.debounce(() => 0);
574 passesPassivePull(noop);
575 passesActivePush(noop);
576 passesSinkClose(noop);
577 passesSourceEnd(noop);
578 passesSingleStart(noop);
579 passesStrictEnd(noop);
580 passesAsyncSequence(noop);
581
582 it('waits for a specified amount of silence before emitting the last value', () => {
583 const { source, next } = sources.makeSubject<number>();
584 const fn = jest.fn();
585
586 sinks.forEach(fn)(operators.debounce(() => 100)(source));
587
588 next(1);
589 jest.advanceTimersByTime(50);
590 expect(fn).not.toHaveBeenCalled();
591
592 next(2);
593 jest.advanceTimersByTime(99);
594 expect(fn).not.toHaveBeenCalled();
595
596 jest.advanceTimersByTime(1);
597 expect(fn).toHaveBeenCalledWith(2);
598 });
599
600 it('emits debounced value with delayed End signal', () => {
601 const { source, next, complete } = sources.makeSubject<number>();
602 const fn = jest.fn();
603
604 sinks.forEach(fn)(operators.debounce(() => 100)(source));
605
606 next(1);
607 complete();
608 jest.advanceTimersByTime(100);
609 expect(fn).toHaveBeenCalled();
610 });
611});
612
613describe('delay', () => {
614 const noop = operators.delay(0);
615 passesPassivePull(noop);
616 passesActivePush(noop);
617 passesSinkClose(noop);
618 passesSourceEnd(noop);
619 passesSingleStart(noop);
620 passesAsyncSequence(noop);
621
622 it('delays outputs by a specified delay timeout value', () => {
623 const { source, next } = sources.makeSubject();
624 const fn = jest.fn();
625
626 sinks.forEach(fn)(operators.delay(100)(source));
627
628 next(1);
629 expect(fn).not.toHaveBeenCalled();
630
631 jest.advanceTimersByTime(100);
632 expect(fn).toHaveBeenCalledWith(1);
633 });
634});
635
636describe('filter', () => {
637 const noop = operators.filter(() => true);
638 passesPassivePull(noop);
639 passesActivePush(noop);
640 passesSinkClose(noop);
641 passesSourceEnd(noop);
642 passesSingleStart(noop);
643 passesAsyncSequence(noop);
644
645 it('prevents emissions for which a predicate fails', () => {
646 const { source, next } = sources.makeSubject();
647 const fn = jest.fn();
648
649 sinks.forEach(fn)(operators.filter(x => !!x)(source));
650
651 next(false);
652 expect(fn).not.toHaveBeenCalled();
653
654 next(true);
655 expect(fn).toHaveBeenCalledWith(true);
656 });
657});
658
659describe('map', () => {
660 const noop = operators.map(x => x);
661 passesPassivePull(noop);
662 passesActivePush(noop);
663 passesSinkClose(noop);
664 passesSourceEnd(noop);
665 passesSingleStart(noop);
666 passesAsyncSequence(noop);
667
668 it('maps over values given a transform function', () => {
669 const { source, next } = sources.makeSubject<number>();
670 const fn = jest.fn();
671
672 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
673
674 next(1);
675 expect(fn).toHaveBeenCalledWith(2);
676 });
677});
678
679describe('mergeMap', () => {
680 const noop = operators.mergeMap(x => sources.fromValue(x));
681 passesPassivePull(noop);
682 passesActivePush(noop);
683 passesSinkClose(noop);
684 passesSourcePushThenEnd(noop);
685 passesSingleStart(noop);
686 passesStrictEnd(noop);
687 passesAsyncSequence(noop);
688
689 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
690 it('emits values from each flattened synchronous source', () => {
691 const { source, next, complete } = sources.makeSubject<number>();
692 const fn = jest.fn();
693
694 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
695
696 next(1);
697 next(3);
698 complete();
699
700 expect(fn.mock.calls).toEqual([
701 [start(expect.any(Function))],
702 [push(1)],
703 [push(2)],
704 [push(3)],
705 [push(4)],
706 [SignalKind.End],
707 ]);
708 });
709
710 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
711 it('lets inner sources finish when outer source ends', () => {
712 const values: Signal<any>[] = [];
713 const teardown = jest.fn();
714 const fn = (signal: Signal<any>) => {
715 values.push(signal);
716 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
717 signal[0](TalkbackKind.Pull);
718 signal[0](TalkbackKind.Close);
719 }
720 };
721
722 operators.mergeMap(() => {
723 return sources.make(() => teardown);
724 })(sources.fromValue(null))(fn);
725
726 expect(teardown).toHaveBeenCalled();
727 expect(values).toEqual([start(expect.any(Function))]);
728 });
729
730 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
731 it('emits values from each flattened asynchronous source simultaneously', () => {
732 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
733 const fn = jest.fn();
734
735 sinks.forEach(fn)(
736 operators.mergeMap((x: number) => {
737 return operators.delay(5)(sources.fromArray([x, x * 2]));
738 })(source)
739 );
740
741 jest.runAllTimers();
742 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]);
743 });
744
745 it('emits synchronous values in order', () => {
746 const values: any[] = [];
747
748 sinks.forEach(x => values.push(x))(
749 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
750 );
751
752 expect(values).toEqual([1, 2, 3, 4]);
753 });
754});
755
756describe('onEnd', () => {
757 const noop = operators.onEnd(() => {});
758 passesPassivePull(noop);
759 passesActivePush(noop);
760 passesSinkClose(noop);
761 passesSourceEnd(noop);
762 passesStrictEnd(noop);
763 passesSingleStart(noop);
764 passesAsyncSequence(noop);
765
766 it('calls a callback when the source ends', () => {
767 const { source, next, complete } = sources.makeSubject<any>();
768 const fn = jest.fn();
769
770 sinks.forEach(() => {})(operators.onEnd(fn)(source));
771
772 next(null);
773 expect(fn).not.toHaveBeenCalled();
774
775 complete();
776 expect(fn).toHaveBeenCalled();
777 });
778});
779
780describe('onPush', () => {
781 const noop = operators.onPush(() => {});
782 passesPassivePull(noop);
783 passesActivePush(noop);
784 passesSinkClose(noop);
785 passesSourceEnd(noop);
786 passesStrictEnd(noop);
787 passesSingleStart(noop);
788 passesAsyncSequence(noop);
789
790 it('calls a callback when the source emits', () => {
791 const { source, next } = sources.makeSubject<number>();
792 const fn = jest.fn();
793
794 sinks.forEach(() => {})(operators.onPush(fn)(source));
795
796 next(1);
797 expect(fn).toHaveBeenCalledWith(1);
798 next(2);
799 expect(fn).toHaveBeenCalledWith(2);
800 });
801
802 it('is the same as `tap`', () => {
803 expect(operators.onPush).toBe(operators.tap);
804 });
805});
806
807describe('onStart', () => {
808 const noop = operators.onStart(() => {});
809 passesPassivePull(noop);
810 passesActivePush(noop);
811 passesSinkClose(noop);
812 passesSourceEnd(noop);
813 passesSingleStart(noop);
814 passesAsyncSequence(noop);
815
816 it('is called when the source starts', () => {
817 let sink: Sink<any>;
818
819 const fn = jest.fn();
820 const source: Source<any> = _sink => {
821 sink = _sink;
822 };
823
824 sinks.forEach(() => {})(operators.onStart(fn)(source));
825
826 expect(fn).not.toHaveBeenCalled();
827
828 sink!(start(() => {}));
829 expect(fn).toHaveBeenCalled();
830 });
831});
832
833describe('sample', () => {
834 const valueThenNever: Source<any> = sink =>
835 sink(
836 start(signal => {
837 if (signal === TalkbackKind.Pull) sink(push(null));
838 })
839 );
840
841 const noop = operators.sample(valueThenNever);
842
843 passesPassivePull(noop);
844 passesActivePush(noop);
845 passesSinkClose(noop);
846 passesSourcePushThenEnd(noop);
847 passesSingleStart(noop);
848 passesStrictEnd(noop);
849
850 it('emits the latest value when a notifier source emits', () => {
851 const { source: notifier$, next: notify } = sources.makeSubject();
852 const { source: input$, next } = sources.makeSubject();
853 const fn = jest.fn();
854
855 sinks.forEach(fn)(operators.sample(notifier$)(input$));
856
857 next(1);
858 next(2);
859 expect(fn).not.toHaveBeenCalled();
860
861 notify(null);
862 expect(fn).toHaveBeenCalledWith(2);
863 });
864});
865
866describe('scan', () => {
867 const noop = operators.scan<any, any>((_acc, x) => x, null);
868 passesPassivePull(noop);
869 passesActivePush(noop);
870 passesSinkClose(noop);
871 passesSourceEnd(noop);
872 passesSingleStart(noop);
873 passesAsyncSequence(noop);
874
875 it('folds values continuously with a reducer and initial value', () => {
876 const { source: input$, next } = sources.makeSubject<number>();
877 const fn = jest.fn();
878
879 const reducer = (acc: number, x: number) => acc + x;
880 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
881
882 next(1);
883 expect(fn).toHaveBeenCalledWith(1);
884 next(2);
885 expect(fn).toHaveBeenCalledWith(3);
886 });
887});
888
889describe('share', () => {
890 const noop = operators.share;
891 passesPassivePull(noop);
892 passesActivePush(noop);
893 passesSinkClose(noop);
894 passesSourceEnd(noop);
895 passesSingleStart(noop);
896 passesStrictEnd(noop);
897 passesAsyncSequence(noop);
898
899 it('shares output values between sinks', () => {
900 let onPush = () => {};
901
902 const source: Source<any> = operators.share(sink => {
903 sink(start(() => {}));
904 onPush = () => {
905 sink(push([0]));
906 sink(SignalKind.End);
907 };
908 });
909
910 const fnA = jest.fn();
911 const fnB = jest.fn();
912
913 sinks.forEach(fnA)(source);
914 sinks.forEach(fnB)(source);
915 onPush();
916
917 expect(fnA).toHaveBeenCalledWith([0]);
918 expect(fnB).toHaveBeenCalledWith([0]);
919 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
920 });
921});
922
923describe('skip', () => {
924 const noop = operators.skip(0);
925 passesPassivePull(noop);
926 passesActivePush(noop);
927 passesSinkClose(noop);
928 passesSourceEnd(noop);
929 passesSingleStart(noop);
930 passesAsyncSequence(noop);
931
932 it('skips a number of values before emitting normally', () => {
933 const { source, next } = sources.makeSubject<number>();
934 const fn = jest.fn();
935
936 sinks.forEach(fn)(operators.skip(1)(source));
937
938 next(1);
939 expect(fn).not.toHaveBeenCalled();
940 next(2);
941 expect(fn).toHaveBeenCalledWith(2);
942 });
943});
944
945describe('skipUntil', () => {
946 const noop = operators.skipUntil(sources.fromValue(null));
947 passesPassivePull(noop);
948 passesActivePush(noop);
949 passesSinkClose(noop);
950 passesSourceEnd(noop);
951 passesSingleStart(noop);
952 passesAsyncSequence(noop);
953 passesStrictEnd(noop);
954
955 it('skips values until the notifier source emits', () => {
956 const { source: notifier$, next: notify } = sources.makeSubject();
957 const { source: input$, next } = sources.makeSubject<number>();
958 const fn = jest.fn();
959
960 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
961
962 next(1);
963 expect(fn).not.toHaveBeenCalled();
964 notify(null);
965 next(2);
966 expect(fn).toHaveBeenCalledWith(2);
967 });
968});
969
970describe('skipWhile', () => {
971 const noop = operators.skipWhile(() => false);
972 passesPassivePull(noop);
973 passesActivePush(noop);
974 passesSinkClose(noop);
975 passesSourceEnd(noop);
976 passesSingleStart(noop);
977 passesAsyncSequence(noop);
978
979 it('skips values until one fails a predicate', () => {
980 const { source, next } = sources.makeSubject<number>();
981 const fn = jest.fn();
982
983 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source));
984
985 next(1);
986 expect(fn).not.toHaveBeenCalled();
987 next(2);
988 expect(fn).toHaveBeenCalledWith(2);
989 });
990});
991
992describe('switchMap', () => {
993 const noop = operators.switchMap(x => sources.fromValue(x));
994 passesPassivePull(noop);
995 passesActivePush(noop);
996 passesSinkClose(noop);
997 passesSourcePushThenEnd(noop);
998 passesSingleStart(noop);
999 passesStrictEnd(noop);
1000 passesAsyncSequence(noop);
1001
1002 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
1003 it('emits values from each flattened synchronous source', () => {
1004 const { source, next, complete } = sources.makeSubject<number>();
1005 const fn = jest.fn();
1006
1007 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
1008
1009 next(1);
1010 next(3);
1011 complete();
1012
1013 expect(fn).toHaveBeenCalledTimes(6);
1014 expect(fn.mock.calls).toEqual([
1015 [start(expect.any(Function))],
1016 [push(1)],
1017 [push(2)],
1018 [push(3)],
1019 [push(4)],
1020 [SignalKind.End],
1021 ]);
1022 });
1023
1024 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
1025 it('lets inner sources finish when outer source ends', () => {
1026 const signals: Signal<any>[] = [];
1027 const teardown = jest.fn();
1028 const fn = (signal: Signal<any>) => {
1029 signals.push(signal);
1030 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
1031 signal[0](TalkbackKind.Pull);
1032 signal[0](TalkbackKind.Close);
1033 }
1034 };
1035
1036 operators.switchMap(() => {
1037 return sources.make(() => teardown);
1038 })(sources.fromValue(null))(fn);
1039
1040 expect(teardown).toHaveBeenCalled();
1041 expect(signals).toEqual([start(expect.any(Function))]);
1042 });
1043
1044 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
1045 it('emits values from each flattened asynchronous source, one at a time', () => {
1046 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
1047 const fn = jest.fn();
1048
1049 sinks.forEach(fn)(
1050 operators.switchMap((x: number) =>
1051 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5)))
1052 )(source)
1053 );
1054
1055 jest.runAllTimers();
1056 expect(fn.mock.calls).toEqual([[1], [10], [20]]);
1057 });
1058});
1059
1060describe('take', () => {
1061 const noop = operators.take(10);
1062 passesPassivePull(noop);
1063 passesActivePush(noop);
1064 passesSinkClose(noop);
1065 passesSourceEnd(noop);
1066 passesSingleStart(noop);
1067 passesStrictEnd(noop);
1068 passesAsyncSequence(noop);
1069
1070 passesCloseAndEnd(operators.take(0));
1071
1072 it('emits values until a maximum is reached', () => {
1073 const { source, next } = sources.makeSubject<number>();
1074 const fn = jest.fn();
1075
1076 operators.take(1)(source)(fn);
1077 next(1);
1078
1079 expect(fn).toHaveBeenCalledTimes(3);
1080 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
1081 });
1082});
1083
1084describe('takeUntil', () => {
1085 const noop = operators.takeUntil(sources.never);
1086 passesPassivePull(noop);
1087 passesActivePush(noop);
1088 passesSinkClose(noop);
1089 passesSourcePushThenEnd(noop);
1090 passesSingleStart(noop);
1091 passesStrictEnd(noop);
1092 passesAsyncSequence(noop);
1093
1094 const ending = operators.takeUntil(sources.fromValue(null));
1095 passesCloseAndEnd(ending);
1096
1097 it('emits values until a notifier emits', () => {
1098 const { source: notifier$, next: notify } = sources.makeSubject<any>();
1099 const { source: input$, next } = sources.makeSubject<number>();
1100 const fn = jest.fn();
1101
1102 operators.takeUntil(notifier$)(input$)(fn);
1103 next(1);
1104
1105 expect(fn).toHaveBeenCalledTimes(2);
1106 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]);
1107
1108 notify(null);
1109 expect(fn).toHaveBeenCalledTimes(3);
1110 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End);
1111 });
1112});
1113
1114describe('takeWhile', () => {
1115 const noop = operators.takeWhile(() => true);
1116 passesPassivePull(noop);
1117 passesActivePush(noop);
1118 passesSinkClose(noop);
1119 passesSourceEnd(noop);
1120 passesSingleStart(noop);
1121 passesAsyncSequence(noop);
1122
1123 const ending = operators.takeWhile(() => false);
1124 passesCloseAndEnd(ending);
1125
1126 it('emits values while a predicate passes for all values', () => {
1127 const { source, next } = sources.makeSubject<number>();
1128 const fn = jest.fn();
1129
1130 operators.takeWhile((x: any) => x < 2)(source)(fn);
1131 next(1);
1132 next(2);
1133
1134 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
1135 });
1136});
1137
1138describe('takeLast', () => {
1139 passesCloseAndEnd(operators.takeLast(0));
1140
1141 it('emits the last max values of an ended source', () => {
1142 const { source, next, complete } = sources.makeSubject<number>();
1143 const signals: Signal<any>[] = [];
1144
1145 let talkback: TalkbackFn;
1146
1147 operators.takeLast(1)(source)(signal => {
1148 signals.push(signal);
1149 if (signal === SignalKind.End) {
1150 /*noop*/
1151 } else if (signal.tag === SignalKind.Start) {
1152 (talkback = signal[0])(TalkbackKind.Pull);
1153 } else {
1154 talkback!(TalkbackKind.Pull);
1155 }
1156 });
1157
1158 next(1);
1159 next(2);
1160
1161 expect(signals.length).toBe(0);
1162 complete();
1163
1164 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]);
1165 });
1166});
1167
1168describe('throttle', () => {
1169 const noop = operators.throttle(() => 0);
1170 passesPassivePull(noop);
1171 passesActivePush(noop);
1172 passesSinkClose(noop);
1173 passesSourceEnd(noop);
1174 passesSingleStart(noop);
1175 passesAsyncSequence(noop);
1176
1177 it('should ignore emissions for a period of time after a value', () => {
1178 const { source, next } = sources.makeSubject<number>();
1179 const fn = jest.fn();
1180
1181 sinks.forEach(fn)(operators.throttle(() => 100)(source));
1182
1183 next(1);
1184 expect(fn).toHaveBeenCalledWith(1);
1185 jest.advanceTimersByTime(50);
1186
1187 next(2);
1188 expect(fn).toHaveBeenCalledTimes(1);
1189 jest.advanceTimersByTime(50);
1190
1191 next(3);
1192 expect(fn).toHaveBeenCalledWith(3);
1193 });
1194});