1import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
2import { push, start } from '../helpers';
3
4import {
5 passesPassivePull,
6 passesActivePush,
7 passesSinkClose,
8 passesSourceEnd,
9 passesSingleStart,
10 passesStrictEnd,
11 passesSourcePushThenEnd,
12 passesAsyncSequence,
13 passesCloseAndEnd,
14} from './compliance';
15
16import * as sources from '../sources';
17import * as sinks from '../sinks';
18import * as operators from '../operators';
19
20beforeEach(() => {
21 jest.useFakeTimers();
22});
23
24describe('buffer', () => {
25 const valueThenNever: Source<any> = sink =>
26 sink(
27 start(signal => {
28 if (signal === TalkbackKind.Pull) sink(push(null));
29 })
30 );
31
32 const noop = operators.buffer(valueThenNever);
33
34 passesPassivePull(noop, [0]);
35 passesActivePush(noop, [0]);
36 passesSinkClose(noop);
37 passesSourcePushThenEnd(noop, [0]);
38 passesSingleStart(noop);
39 passesStrictEnd(noop);
40
41 it('emits batches of input values when a notifier emits', () => {
42 const { source: notifier$, next: notify } = sources.makeSubject();
43 const { source: input$, next } = sources.makeSubject();
44 const fn = jest.fn();
45
46 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
47
48 next(1);
49 next(2);
50 expect(fn).not.toHaveBeenCalled();
51
52 notify(null);
53 expect(fn).toHaveBeenCalledWith([1, 2]);
54
55 next(3);
56 notify(null);
57 expect(fn).toHaveBeenCalledWith([3]);
58 });
59});
60
61describe('concatMap', () => {
62 const noop = operators.concatMap(x => sources.fromValue(x));
63 passesPassivePull(noop);
64 passesActivePush(noop);
65 passesSinkClose(noop);
66 passesSourcePushThenEnd(noop);
67 passesSingleStart(noop);
68 passesStrictEnd(noop);
69 passesAsyncSequence(noop);
70
71 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
72 it('emits values from each flattened synchronous source', () => {
73 const { source, next, complete } = sources.makeSubject<number>();
74 const fn = jest.fn();
75
76 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
77
78 next(1);
79 next(3);
80 complete();
81
82 expect(fn).toHaveBeenCalledTimes(6);
83 expect(fn.mock.calls).toEqual([
84 [start(expect.any(Function))],
85 [push(1)],
86 [push(2)],
87 [push(3)],
88 [push(4)],
89 [SignalKind.End],
90 ]);
91 });
92
93 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
94 it('lets inner sources finish when outer source ends', () => {
95 const signals: Signal<any>[] = [];
96 const teardown = jest.fn();
97 const fn = (signal: Signal<any>) => {
98 signals.push(signal);
99 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
100 signal[0](TalkbackKind.Pull);
101 signal[0](TalkbackKind.Close);
102 }
103 };
104
105 operators.concatMap(() => {
106 return sources.make(() => teardown);
107 })(sources.fromValue(null))(fn);
108
109 expect(teardown).toHaveBeenCalled();
110 expect(signals).toEqual([start(expect.any(Function))]);
111 });
112
113 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
114 it('emits values from each flattened asynchronous source, one at a time', () => {
115 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
116 const fn = jest.fn();
117
118 sinks.forEach(fn)(
119 operators.concatMap((x: number) => {
120 return operators.delay(5)(sources.fromArray([x, x * 2]));
121 })(source)
122 );
123
124 jest.advanceTimersByTime(14);
125 expect(fn.mock.calls).toEqual([[1], [2]]);
126
127 jest.runAllTimers();
128 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]);
129 });
130
131 it('works for fully asynchronous sources', () => {
132 const fn = jest.fn();
133
134 sinks.forEach(fn)(
135 operators.concatMap(() => {
136 return sources.make(observer => {
137 setTimeout(() => observer.next(1));
138 return () => {};
139 });
140 })(sources.fromValue(null))
141 );
142
143 jest.runAllTimers();
144 expect(fn).toHaveBeenCalledWith(1);
145 });
146
147 it('emits synchronous values in order', () => {
148 const values: any[] = [];
149
150 sinks.forEach(x => values.push(x))(
151 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
152 );
153
154 expect(values).toEqual([1, 2, 3, 4]);
155 });
156});
157
158describe('debounce', () => {
159 const noop = operators.debounce(() => 0);
160 passesPassivePull(noop);
161 passesActivePush(noop);
162 passesSinkClose(noop);
163 passesSourceEnd(noop);
164 passesSingleStart(noop);
165 passesStrictEnd(noop);
166 passesAsyncSequence(noop);
167
168 it('waits for a specified amount of silence before emitting the last value', () => {
169 const { source, next } = sources.makeSubject<number>();
170 const fn = jest.fn();
171
172 sinks.forEach(fn)(operators.debounce(() => 100)(source));
173
174 next(1);
175 jest.advanceTimersByTime(50);
176 expect(fn).not.toHaveBeenCalled();
177
178 next(2);
179 jest.advanceTimersByTime(99);
180 expect(fn).not.toHaveBeenCalled();
181
182 jest.advanceTimersByTime(1);
183 expect(fn).toHaveBeenCalledWith(2);
184 });
185
186 it('emits debounced value with delayed End signal', () => {
187 const { source, next, complete } = sources.makeSubject<number>();
188 const fn = jest.fn();
189
190 sinks.forEach(fn)(operators.debounce(() => 100)(source));
191
192 next(1);
193 complete();
194 jest.advanceTimersByTime(100);
195 expect(fn).toHaveBeenCalled();
196 });
197});
198
199describe('delay', () => {
200 const noop = operators.delay(0);
201 passesPassivePull(noop);
202 passesActivePush(noop);
203 passesSinkClose(noop);
204 passesSourceEnd(noop);
205 passesSingleStart(noop);
206 passesAsyncSequence(noop);
207
208 it('delays outputs by a specified delay timeout value', () => {
209 const { source, next } = sources.makeSubject();
210 const fn = jest.fn();
211
212 sinks.forEach(fn)(operators.delay(100)(source));
213
214 next(1);
215 expect(fn).not.toHaveBeenCalled();
216
217 jest.advanceTimersByTime(100);
218 expect(fn).toHaveBeenCalledWith(1);
219 });
220});
221
222describe('filter', () => {
223 const noop = operators.filter(() => true);
224 passesPassivePull(noop);
225 passesActivePush(noop);
226 passesSinkClose(noop);
227 passesSourceEnd(noop);
228 passesSingleStart(noop);
229 passesAsyncSequence(noop);
230
231 it('prevents emissions for which a predicate fails', () => {
232 const { source, next } = sources.makeSubject();
233 const fn = jest.fn();
234
235 sinks.forEach(fn)(operators.filter(x => !!x)(source));
236
237 next(false);
238 expect(fn).not.toHaveBeenCalled();
239
240 next(true);
241 expect(fn).toHaveBeenCalledWith(true);
242 });
243});
244
245describe('map', () => {
246 const noop = operators.map(x => x);
247 passesPassivePull(noop);
248 passesActivePush(noop);
249 passesSinkClose(noop);
250 passesSourceEnd(noop);
251 passesSingleStart(noop);
252 passesAsyncSequence(noop);
253
254 it('maps over values given a transform function', () => {
255 const { source, next } = sources.makeSubject<number>();
256 const fn = jest.fn();
257
258 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
259
260 next(1);
261 expect(fn).toHaveBeenCalledWith(2);
262 });
263});
264
265describe('mergeMap', () => {
266 const noop = operators.mergeMap(x => sources.fromValue(x));
267 passesPassivePull(noop);
268 passesActivePush(noop);
269 passesSinkClose(noop);
270 passesSourcePushThenEnd(noop);
271 passesSingleStart(noop);
272 passesStrictEnd(noop);
273 passesAsyncSequence(noop);
274
275 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
276 it('emits values from each flattened synchronous source', () => {
277 const { source, next, complete } = sources.makeSubject<number>();
278 const fn = jest.fn();
279
280 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
281
282 next(1);
283 next(3);
284 complete();
285
286 expect(fn.mock.calls).toEqual([
287 [start(expect.any(Function))],
288 [push(1)],
289 [push(2)],
290 [push(3)],
291 [push(4)],
292 [SignalKind.End],
293 ]);
294 });
295
296 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
297 it('lets inner sources finish when outer source ends', () => {
298 const values: Signal<any>[] = [];
299 const teardown = jest.fn();
300 const fn = (signal: Signal<any>) => {
301 values.push(signal);
302 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
303 signal[0](TalkbackKind.Pull);
304 signal[0](TalkbackKind.Close);
305 }
306 };
307
308 operators.mergeMap(() => {
309 return sources.make(() => teardown);
310 })(sources.fromValue(null))(fn);
311
312 expect(teardown).toHaveBeenCalled();
313 expect(values).toEqual([start(expect.any(Function))]);
314 });
315
316 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
317 it('emits values from each flattened asynchronous source simultaneously', () => {
318 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
319 const fn = jest.fn();
320
321 sinks.forEach(fn)(
322 operators.mergeMap((x: number) => {
323 return operators.delay(5)(sources.fromArray([x, x * 2]));
324 })(source)
325 );
326
327 jest.runAllTimers();
328 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]);
329 });
330
331 it('emits synchronous values in order', () => {
332 const values: any[] = [];
333
334 sinks.forEach(x => values.push(x))(
335 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
336 );
337
338 expect(values).toEqual([1, 2, 3, 4]);
339 });
340});
341
342describe('onEnd', () => {
343 const noop = operators.onEnd(() => {});
344 passesPassivePull(noop);
345 passesActivePush(noop);
346 passesSinkClose(noop);
347 passesSourceEnd(noop);
348 passesStrictEnd(noop);
349 passesSingleStart(noop);
350 passesAsyncSequence(noop);
351
352 it('calls a callback when the source ends', () => {
353 const { source, next, complete } = sources.makeSubject<any>();
354 const fn = jest.fn();
355
356 sinks.forEach(() => {})(operators.onEnd(fn)(source));
357
358 next(null);
359 expect(fn).not.toHaveBeenCalled();
360
361 complete();
362 expect(fn).toHaveBeenCalled();
363 });
364});
365
366describe('onPush', () => {
367 const noop = operators.onPush(() => {});
368 passesPassivePull(noop);
369 passesActivePush(noop);
370 passesSinkClose(noop);
371 passesSourceEnd(noop);
372 passesStrictEnd(noop);
373 passesSingleStart(noop);
374 passesAsyncSequence(noop);
375
376 it('calls a callback when the source emits', () => {
377 const { source, next } = sources.makeSubject<number>();
378 const fn = jest.fn();
379
380 sinks.forEach(() => {})(operators.onPush(fn)(source));
381
382 next(1);
383 expect(fn).toHaveBeenCalledWith(1);
384 next(2);
385 expect(fn).toHaveBeenCalledWith(2);
386 });
387
388 it('is the same as `tap`', () => {
389 expect(operators.onPush).toBe(operators.tap);
390 });
391});
392
393describe('onStart', () => {
394 const noop = operators.onStart(() => {});
395 passesPassivePull(noop);
396 passesActivePush(noop);
397 passesSinkClose(noop);
398 passesSourceEnd(noop);
399 passesSingleStart(noop);
400 passesAsyncSequence(noop);
401
402 it('is called when the source starts', () => {
403 let sink: Sink<any>;
404
405 const fn = jest.fn();
406 const source: Source<any> = _sink => {
407 sink = _sink;
408 };
409
410 sinks.forEach(() => {})(operators.onStart(fn)(source));
411
412 expect(fn).not.toHaveBeenCalled();
413
414 sink!(start(() => {}));
415 expect(fn).toHaveBeenCalled();
416 });
417});
418
419describe('sample', () => {
420 const valueThenNever: Source<any> = sink =>
421 sink(
422 start(signal => {
423 if (signal === TalkbackKind.Pull) sink(push(null));
424 })
425 );
426
427 const noop = operators.sample(valueThenNever);
428
429 passesPassivePull(noop);
430 passesActivePush(noop);
431 passesSinkClose(noop);
432 passesSourcePushThenEnd(noop);
433 passesSingleStart(noop);
434 passesStrictEnd(noop);
435
436 it('emits the latest value when a notifier source emits', () => {
437 const { source: notifier$, next: notify } = sources.makeSubject();
438 const { source: input$, next } = sources.makeSubject();
439 const fn = jest.fn();
440
441 sinks.forEach(fn)(operators.sample(notifier$)(input$));
442
443 next(1);
444 next(2);
445 expect(fn).not.toHaveBeenCalled();
446
447 notify(null);
448 expect(fn).toHaveBeenCalledWith(2);
449 });
450});
451
452describe('scan', () => {
453 const noop = operators.scan<any, any>((_acc, x) => x, null);
454 passesPassivePull(noop);
455 passesActivePush(noop);
456 passesSinkClose(noop);
457 passesSourceEnd(noop);
458 passesSingleStart(noop);
459 passesAsyncSequence(noop);
460
461 it('folds values continuously with a reducer and initial value', () => {
462 const { source: input$, next } = sources.makeSubject<number>();
463 const fn = jest.fn();
464
465 const reducer = (acc: number, x: number) => acc + x;
466 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
467
468 next(1);
469 expect(fn).toHaveBeenCalledWith(1);
470 next(2);
471 expect(fn).toHaveBeenCalledWith(3);
472 });
473});
474
475describe('share', () => {
476 const noop = operators.share;
477 passesPassivePull(noop);
478 passesActivePush(noop);
479 passesSinkClose(noop);
480 passesSourceEnd(noop);
481 passesSingleStart(noop);
482 passesStrictEnd(noop);
483 passesAsyncSequence(noop);
484
485 it('shares output values between sinks', () => {
486 let onPush = () => {};
487
488 const source: Source<any> = operators.share(sink => {
489 sink(start(() => {}));
490 onPush = () => {
491 sink(push([0]));
492 sink(SignalKind.End);
493 };
494 });
495
496 const fnA = jest.fn();
497 const fnB = jest.fn();
498
499 sinks.forEach(fnA)(source);
500 sinks.forEach(fnB)(source);
501 onPush();
502
503 expect(fnA).toHaveBeenCalledWith([0]);
504 expect(fnB).toHaveBeenCalledWith([0]);
505 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
506 });
507});
508
509describe('skip', () => {
510 const noop = operators.skip(0);
511 passesPassivePull(noop);
512 passesActivePush(noop);
513 passesSinkClose(noop);
514 passesSourceEnd(noop);
515 passesSingleStart(noop);
516 passesAsyncSequence(noop);
517
518 it('skips a number of values before emitting normally', () => {
519 const { source, next } = sources.makeSubject<number>();
520 const fn = jest.fn();
521
522 sinks.forEach(fn)(operators.skip(1)(source));
523
524 next(1);
525 expect(fn).not.toHaveBeenCalled();
526 next(2);
527 expect(fn).toHaveBeenCalledWith(2);
528 });
529});
530
531describe('skipUntil', () => {
532 const noop = operators.skipUntil(sources.fromValue(null));
533 passesPassivePull(noop);
534 passesActivePush(noop);
535 passesSinkClose(noop);
536 passesSourceEnd(noop);
537 passesSingleStart(noop);
538 passesAsyncSequence(noop);
539 passesStrictEnd(noop);
540
541 it('skips values until the notifier source emits', () => {
542 const { source: notifier$, next: notify } = sources.makeSubject();
543 const { source: input$, next } = sources.makeSubject<number>();
544 const fn = jest.fn();
545
546 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
547
548 next(1);
549 expect(fn).not.toHaveBeenCalled();
550 notify(null);
551 next(2);
552 expect(fn).toHaveBeenCalledWith(2);
553 });
554});
555
556describe('skipWhile', () => {
557 const noop = operators.skipWhile(() => false);
558 passesPassivePull(noop);
559 passesActivePush(noop);
560 passesSinkClose(noop);
561 passesSourceEnd(noop);
562 passesSingleStart(noop);
563 passesAsyncSequence(noop);
564
565 it('skips values until one fails a predicate', () => {
566 const { source, next } = sources.makeSubject<number>();
567 const fn = jest.fn();
568
569 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source));
570
571 next(1);
572 expect(fn).not.toHaveBeenCalled();
573 next(2);
574 expect(fn).toHaveBeenCalledWith(2);
575 });
576});
577
578describe('switchMap', () => {
579 const noop = operators.switchMap(x => sources.fromValue(x));
580 passesPassivePull(noop);
581 passesActivePush(noop);
582 passesSinkClose(noop);
583 passesSourcePushThenEnd(noop);
584 passesSingleStart(noop);
585 passesStrictEnd(noop);
586 passesAsyncSequence(noop);
587
588 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
589 it('emits values from each flattened synchronous source', () => {
590 const { source, next, complete } = sources.makeSubject<number>();
591 const fn = jest.fn();
592
593 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
594
595 next(1);
596 next(3);
597 complete();
598
599 expect(fn).toHaveBeenCalledTimes(6);
600 expect(fn.mock.calls).toEqual([
601 [start(expect.any(Function))],
602 [push(1)],
603 [push(2)],
604 [push(3)],
605 [push(4)],
606 [SignalKind.End],
607 ]);
608 });
609
610 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
611 it('lets inner sources finish when outer source ends', () => {
612 const signals: Signal<any>[] = [];
613 const teardown = jest.fn();
614 const fn = (signal: Signal<any>) => {
615 signals.push(signal);
616 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
617 signal[0](TalkbackKind.Pull);
618 signal[0](TalkbackKind.Close);
619 }
620 };
621
622 operators.switchMap(() => {
623 return sources.make(() => teardown);
624 })(sources.fromValue(null))(fn);
625
626 expect(teardown).toHaveBeenCalled();
627 expect(signals).toEqual([start(expect.any(Function))]);
628 });
629
630 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
631 it('emits values from each flattened asynchronous source, one at a time', () => {
632 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
633 const fn = jest.fn();
634
635 sinks.forEach(fn)(
636 operators.switchMap((x: number) =>
637 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5)))
638 )(source)
639 );
640
641 jest.runAllTimers();
642 expect(fn.mock.calls).toEqual([[1], [10], [20]]);
643 });
644});
645
646describe('take', () => {
647 const noop = operators.take(10);
648 passesPassivePull(noop);
649 passesActivePush(noop);
650 passesSinkClose(noop);
651 passesSourceEnd(noop);
652 passesSingleStart(noop);
653 passesStrictEnd(noop);
654 passesAsyncSequence(noop);
655
656 passesCloseAndEnd(operators.take(0));
657
658 it('emits values until a maximum is reached', () => {
659 const { source, next } = sources.makeSubject<number>();
660 const fn = jest.fn();
661
662 operators.take(1)(source)(fn);
663 next(1);
664
665 expect(fn).toHaveBeenCalledTimes(3);
666 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
667 });
668});
669
670describe('takeUntil', () => {
671 const noop = operators.takeUntil(sources.never);
672 passesPassivePull(noop);
673 passesActivePush(noop);
674 passesSinkClose(noop);
675 passesSourcePushThenEnd(noop);
676 passesSingleStart(noop);
677 passesStrictEnd(noop);
678 passesAsyncSequence(noop);
679
680 const ending = operators.takeUntil(sources.fromValue(null));
681 passesCloseAndEnd(ending);
682
683 it('emits values until a notifier emits', () => {
684 const { source: notifier$, next: notify } = sources.makeSubject<any>();
685 const { source: input$, next } = sources.makeSubject<number>();
686 const fn = jest.fn();
687
688 operators.takeUntil(notifier$)(input$)(fn);
689 next(1);
690
691 expect(fn).toHaveBeenCalledTimes(2);
692 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]);
693
694 notify(null);
695 expect(fn).toHaveBeenCalledTimes(3);
696 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End);
697 });
698});
699
700describe('takeWhile', () => {
701 const noop = operators.takeWhile(() => true);
702 passesPassivePull(noop);
703 passesActivePush(noop);
704 passesSinkClose(noop);
705 passesSourceEnd(noop);
706 passesSingleStart(noop);
707 passesAsyncSequence(noop);
708
709 const ending = operators.takeWhile(() => false);
710 passesCloseAndEnd(ending);
711
712 it('emits values while a predicate passes for all values', () => {
713 const { source, next } = sources.makeSubject<number>();
714 const fn = jest.fn();
715
716 operators.takeWhile((x: any) => x < 2)(source)(fn);
717 next(1);
718 next(2);
719
720 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
721 });
722});
723
724describe('takeLast', () => {
725 passesCloseAndEnd(operators.takeLast(0));
726
727 it('emits the last max values of an ended source', () => {
728 const { source, next, complete } = sources.makeSubject<number>();
729 const signals: Signal<any>[] = [];
730
731 let talkback: TalkbackFn;
732
733 operators.takeLast(1)(source)(signal => {
734 signals.push(signal);
735 if (signal === SignalKind.End) {
736 /*noop*/
737 } else if (signal.tag === SignalKind.Start) {
738 (talkback = signal[0])(TalkbackKind.Pull);
739 } else {
740 talkback!(TalkbackKind.Pull);
741 }
742 });
743
744 next(1);
745 next(2);
746
747 expect(signals.length).toBe(0);
748 complete();
749
750 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]);
751 });
752});
753
754describe('throttle', () => {
755 const noop = operators.throttle(() => 0);
756 passesPassivePull(noop);
757 passesActivePush(noop);
758 passesSinkClose(noop);
759 passesSourceEnd(noop);
760 passesSingleStart(noop);
761 passesAsyncSequence(noop);
762
763 it('should ignore emissions for a period of time after a value', () => {
764 const { source, next } = sources.makeSubject<number>();
765 const fn = jest.fn();
766
767 sinks.forEach(fn)(operators.throttle(() => 100)(source));
768
769 next(1);
770 expect(fn).toHaveBeenCalledWith(1);
771 jest.advanceTimersByTime(50);
772
773 next(2);
774 expect(fn).toHaveBeenCalledTimes(1);
775 jest.advanceTimersByTime(50);
776
777 next(3);
778 expect(fn).toHaveBeenCalledWith(3);
779 });
780});