1import { describe, it, expect, beforeEach, vi } from 'vitest';
2
3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
4import { push, start } from '../helpers';
5
6import {
7 passesPassivePull,
8 passesActivePush,
9 passesSinkClose,
10 passesSourceEnd,
11 passesSingleStart,
12 passesStrictEnd,
13 passesSourcePushThenEnd,
14 passesAsyncSequence,
15 passesCloseAndEnd,
16} from './compliance';
17
18import * as sources from '../sources';
19import * as sinks from '../sinks';
20import * as operators from '../operators';
21
22beforeEach(() => {
23 vi.useFakeTimers();
24});
25
26describe('buffer', () => {
27 const valueThenNever: Source<any> = sink =>
28 sink(
29 start(signal => {
30 if (signal === TalkbackKind.Pull) sink(push(null));
31 })
32 );
33
34 const noop = operators.buffer(valueThenNever);
35
36 passesPassivePull(noop, [0]);
37 passesActivePush(noop, [0]);
38 passesSinkClose(noop);
39 passesSourcePushThenEnd(noop, [0]);
40 passesSingleStart(noop);
41 passesStrictEnd(noop);
42
43 it('emits batches of input values when a notifier emits', () => {
44 const { source: notifier$, next: notify } = sources.makeSubject();
45 const { source: input$, next } = sources.makeSubject();
46 const fn = vi.fn();
47
48 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
49
50 next(1);
51 next(2);
52 expect(fn).not.toHaveBeenCalled();
53
54 notify(null);
55 expect(fn).toHaveBeenCalledWith([1, 2]);
56
57 next(3);
58 notify(null);
59 expect(fn).toHaveBeenCalledWith([3]);
60 });
61});
62
63describe('concatMap', () => {
64 const noop = operators.concatMap(x => sources.fromValue(x));
65 passesPassivePull(noop);
66 passesActivePush(noop);
67 passesSinkClose(noop);
68 passesSourcePushThenEnd(noop);
69 passesSingleStart(noop);
70 passesStrictEnd(noop);
71 passesAsyncSequence(noop);
72
73 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
74 it('emits values from each flattened synchronous source', () => {
75 const { source, next, complete } = sources.makeSubject<number>();
76 const fn = vi.fn();
77
78 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
79
80 next(1);
81 next(3);
82 complete();
83
84 expect(fn).toHaveBeenCalledTimes(6);
85 expect(fn.mock.calls).toEqual([
86 [start(expect.any(Function))],
87 [push(1)],
88 [push(2)],
89 [push(3)],
90 [push(4)],
91 [SignalKind.End],
92 ]);
93 });
94
95 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
96 it('lets inner sources finish when outer source ends', () => {
97 const signals: Signal<any>[] = [];
98 const teardown = vi.fn();
99 const fn = (signal: Signal<any>) => {
100 signals.push(signal);
101 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
102 signal[0](TalkbackKind.Pull);
103 signal[0](TalkbackKind.Close);
104 }
105 };
106
107 operators.concatMap(() => {
108 return sources.make(() => teardown);
109 })(sources.fromValue(null))(fn);
110
111 expect(teardown).toHaveBeenCalled();
112 expect(signals).toEqual([start(expect.any(Function))]);
113 });
114
115 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
116 it('emits values from each flattened asynchronous source, one at a time', () => {
117 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
118 const fn = vi.fn();
119
120 sinks.forEach(fn)(
121 operators.concatMap((x: number) => {
122 return operators.delay(5)(sources.fromArray([x, x * 2]));
123 })(source)
124 );
125
126 vi.advanceTimersByTime(14);
127 expect(fn.mock.calls).toEqual([[1], [2]]);
128
129 vi.runAllTimers();
130 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]);
131 });
132
133 it('works for fully asynchronous sources', () => {
134 const fn = vi.fn();
135
136 sinks.forEach(fn)(
137 operators.concatMap(() => {
138 return sources.make(observer => {
139 setTimeout(() => observer.next(1));
140 return () => {};
141 });
142 })(sources.fromValue(null))
143 );
144
145 vi.runAllTimers();
146 expect(fn).toHaveBeenCalledWith(1);
147 });
148
149 it('emits synchronous values in order', () => {
150 const values: any[] = [];
151
152 sinks.forEach(x => values.push(x))(
153 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
154 );
155
156 expect(values).toEqual([1, 2, 3, 4]);
157 });
158});
159
160describe('debounce', () => {
161 const noop = operators.debounce(() => 0);
162 passesPassivePull(noop);
163 passesActivePush(noop);
164 passesSinkClose(noop);
165 passesSourceEnd(noop);
166 passesSingleStart(noop);
167 passesStrictEnd(noop);
168 passesAsyncSequence(noop);
169
170 it('waits for a specified amount of silence before emitting the last value', () => {
171 const { source, next } = sources.makeSubject<number>();
172 const fn = vi.fn();
173
174 sinks.forEach(fn)(operators.debounce(() => 100)(source));
175
176 next(1);
177 vi.advanceTimersByTime(50);
178 expect(fn).not.toHaveBeenCalled();
179
180 next(2);
181 vi.advanceTimersByTime(99);
182 expect(fn).not.toHaveBeenCalled();
183
184 vi.advanceTimersByTime(1);
185 expect(fn).toHaveBeenCalledWith(2);
186 });
187
188 it('emits debounced value with delayed End signal', () => {
189 const { source, next, complete } = sources.makeSubject<number>();
190 const fn = vi.fn();
191
192 sinks.forEach(fn)(operators.debounce(() => 100)(source));
193
194 next(1);
195 complete();
196 vi.advanceTimersByTime(100);
197 expect(fn).toHaveBeenCalled();
198 });
199});
200
201describe('delay', () => {
202 const noop = operators.delay(0);
203 passesPassivePull(noop);
204 passesActivePush(noop);
205 passesSinkClose(noop);
206 passesSourceEnd(noop);
207 passesSingleStart(noop);
208 passesAsyncSequence(noop);
209
210 it('delays outputs by a specified delay timeout value', () => {
211 const { source, next } = sources.makeSubject();
212 const fn = vi.fn();
213
214 sinks.forEach(fn)(operators.delay(100)(source));
215
216 next(1);
217 expect(fn).not.toHaveBeenCalled();
218
219 vi.advanceTimersByTime(100);
220 expect(fn).toHaveBeenCalledWith(1);
221 });
222});
223
224describe('filter', () => {
225 const noop = operators.filter(() => true);
226 passesPassivePull(noop);
227 passesActivePush(noop);
228 passesSinkClose(noop);
229 passesSourceEnd(noop);
230 passesSingleStart(noop);
231 passesAsyncSequence(noop);
232
233 it('prevents emissions for which a predicate fails', () => {
234 const { source, next } = sources.makeSubject();
235 const fn = vi.fn();
236
237 sinks.forEach(fn)(operators.filter(x => !!x)(source));
238
239 next(false);
240 expect(fn).not.toHaveBeenCalled();
241
242 next(true);
243 expect(fn).toHaveBeenCalledWith(true);
244 });
245});
246
247describe('map', () => {
248 const noop = operators.map(x => x);
249 passesPassivePull(noop);
250 passesActivePush(noop);
251 passesSinkClose(noop);
252 passesSourceEnd(noop);
253 passesSingleStart(noop);
254 passesAsyncSequence(noop);
255
256 it('maps over values given a transform function', () => {
257 const { source, next } = sources.makeSubject<number>();
258 const fn = vi.fn();
259
260 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
261
262 next(1);
263 expect(fn).toHaveBeenCalledWith(2);
264 });
265});
266
267describe('mergeMap', () => {
268 const noop = operators.mergeMap(x => sources.fromValue(x));
269 passesPassivePull(noop);
270 passesActivePush(noop);
271 passesSinkClose(noop);
272 passesSourcePushThenEnd(noop);
273 passesSingleStart(noop);
274 passesStrictEnd(noop);
275 passesAsyncSequence(noop);
276
277 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
278 it('emits values from each flattened synchronous source', () => {
279 const { source, next, complete } = sources.makeSubject<number>();
280 const fn = vi.fn();
281
282 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
283
284 next(1);
285 next(3);
286 complete();
287
288 expect(fn.mock.calls).toEqual([
289 [start(expect.any(Function))],
290 [push(1)],
291 [push(2)],
292 [push(3)],
293 [push(4)],
294 [SignalKind.End],
295 ]);
296 });
297
298 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
299 it('lets inner sources finish when outer source ends', () => {
300 const values: Signal<any>[] = [];
301 const teardown = vi.fn();
302 const fn = (signal: Signal<any>) => {
303 values.push(signal);
304 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
305 signal[0](TalkbackKind.Pull);
306 signal[0](TalkbackKind.Close);
307 }
308 };
309
310 operators.mergeMap(() => {
311 return sources.make(() => teardown);
312 })(sources.fromValue(null))(fn);
313
314 expect(teardown).toHaveBeenCalled();
315 expect(values).toEqual([start(expect.any(Function))]);
316 });
317
318 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
319 it('emits values from each flattened asynchronous source simultaneously', () => {
320 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
321 const fn = vi.fn();
322
323 sinks.forEach(fn)(
324 operators.mergeMap((x: number) => {
325 return operators.delay(5)(sources.fromArray([x, x * 2]));
326 })(source)
327 );
328
329 vi.runAllTimers();
330 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]);
331 });
332
333 it('emits synchronous values in order', () => {
334 const values: any[] = [];
335
336 sinks.forEach(x => values.push(x))(
337 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
338 );
339
340 expect(values).toEqual([1, 2, 3, 4]);
341 });
342});
343
344describe('onEnd', () => {
345 const noop = operators.onEnd(() => {});
346 passesPassivePull(noop);
347 passesActivePush(noop);
348 passesSinkClose(noop);
349 passesSourceEnd(noop);
350 passesStrictEnd(noop);
351 passesSingleStart(noop);
352 passesAsyncSequence(noop);
353
354 it('calls a callback when the source ends', () => {
355 const { source, next, complete } = sources.makeSubject<any>();
356 const fn = vi.fn();
357
358 sinks.forEach(() => {})(operators.onEnd(fn)(source));
359
360 next(null);
361 expect(fn).not.toHaveBeenCalled();
362
363 complete();
364 expect(fn).toHaveBeenCalled();
365 });
366});
367
368describe('onPush', () => {
369 const noop = operators.onPush(() => {});
370 passesPassivePull(noop);
371 passesActivePush(noop);
372 passesSinkClose(noop);
373 passesSourceEnd(noop);
374 passesStrictEnd(noop);
375 passesSingleStart(noop);
376 passesAsyncSequence(noop);
377
378 it('calls a callback when the source emits', () => {
379 const { source, next } = sources.makeSubject<number>();
380 const fn = vi.fn();
381
382 sinks.forEach(() => {})(operators.onPush(fn)(source));
383
384 next(1);
385 expect(fn).toHaveBeenCalledWith(1);
386 next(2);
387 expect(fn).toHaveBeenCalledWith(2);
388 });
389
390 it('is the same as `tap`', () => {
391 expect(operators.onPush).toBe(operators.tap);
392 });
393});
394
395describe('onStart', () => {
396 const noop = operators.onStart(() => {});
397 passesPassivePull(noop);
398 passesActivePush(noop);
399 passesSinkClose(noop);
400 passesSourceEnd(noop);
401 passesSingleStart(noop);
402 passesAsyncSequence(noop);
403
404 it('is called when the source starts', () => {
405 let sink: Sink<any>;
406
407 const fn = vi.fn();
408 const source: Source<any> = _sink => {
409 sink = _sink;
410 };
411
412 sinks.forEach(() => {})(operators.onStart(fn)(source));
413
414 expect(fn).not.toHaveBeenCalled();
415
416 sink!(start(() => {}));
417 expect(fn).toHaveBeenCalled();
418 });
419});
420
421describe('sample', () => {
422 const valueThenNever: Source<any> = sink =>
423 sink(
424 start(signal => {
425 if (signal === TalkbackKind.Pull) sink(push(null));
426 })
427 );
428
429 const noop = operators.sample(valueThenNever);
430
431 passesPassivePull(noop);
432 passesActivePush(noop);
433 passesSinkClose(noop);
434 passesSourcePushThenEnd(noop);
435 passesSingleStart(noop);
436 passesStrictEnd(noop);
437
438 it('emits the latest value when a notifier source emits', () => {
439 const { source: notifier$, next: notify } = sources.makeSubject();
440 const { source: input$, next } = sources.makeSubject();
441 const fn = vi.fn();
442
443 sinks.forEach(fn)(operators.sample(notifier$)(input$));
444
445 next(1);
446 next(2);
447 expect(fn).not.toHaveBeenCalled();
448
449 notify(null);
450 expect(fn).toHaveBeenCalledWith(2);
451 });
452});
453
454describe('scan', () => {
455 const noop = operators.scan<any, any>((_acc, x) => x, null);
456 passesPassivePull(noop);
457 passesActivePush(noop);
458 passesSinkClose(noop);
459 passesSourceEnd(noop);
460 passesSingleStart(noop);
461 passesAsyncSequence(noop);
462
463 it('folds values continuously with a reducer and initial value', () => {
464 const { source: input$, next } = sources.makeSubject<number>();
465 const fn = vi.fn();
466
467 const reducer = (acc: number, x: number) => acc + x;
468 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
469
470 next(1);
471 expect(fn).toHaveBeenCalledWith(1);
472 next(2);
473 expect(fn).toHaveBeenCalledWith(3);
474 });
475});
476
477describe('share', () => {
478 const noop = operators.share;
479 passesPassivePull(noop);
480 passesActivePush(noop);
481 passesSinkClose(noop);
482 passesSourceEnd(noop);
483 passesSingleStart(noop);
484 passesStrictEnd(noop);
485 passesAsyncSequence(noop);
486
487 it('shares output values between sinks', () => {
488 let onPush = () => {};
489
490 const source: Source<any> = operators.share(sink => {
491 sink(start(() => {}));
492 onPush = () => {
493 sink(push([0]));
494 sink(SignalKind.End);
495 };
496 });
497
498 const fnA = vi.fn();
499 const fnB = vi.fn();
500
501 sinks.forEach(fnA)(source);
502 sinks.forEach(fnB)(source);
503 onPush();
504
505 expect(fnA).toHaveBeenCalledWith([0]);
506 expect(fnB).toHaveBeenCalledWith([0]);
507 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
508 });
509});
510
511describe('skip', () => {
512 const noop = operators.skip(0);
513 passesPassivePull(noop);
514 passesActivePush(noop);
515 passesSinkClose(noop);
516 passesSourceEnd(noop);
517 passesSingleStart(noop);
518 passesAsyncSequence(noop);
519
520 it('skips a number of values before emitting normally', () => {
521 const { source, next } = sources.makeSubject<number>();
522 const fn = vi.fn();
523
524 sinks.forEach(fn)(operators.skip(1)(source));
525
526 next(1);
527 expect(fn).not.toHaveBeenCalled();
528 next(2);
529 expect(fn).toHaveBeenCalledWith(2);
530 });
531});
532
533describe('skipUntil', () => {
534 const noop = operators.skipUntil(sources.fromValue(null));
535 passesPassivePull(noop);
536 passesActivePush(noop);
537 passesSinkClose(noop);
538 passesSourceEnd(noop);
539 passesSingleStart(noop);
540 passesAsyncSequence(noop);
541 passesStrictEnd(noop);
542
543 it('skips values until the notifier source emits', () => {
544 const { source: notifier$, next: notify } = sources.makeSubject();
545 const { source: input$, next } = sources.makeSubject<number>();
546 const fn = vi.fn();
547
548 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
549
550 next(1);
551 expect(fn).not.toHaveBeenCalled();
552 notify(null);
553 next(2);
554 expect(fn).toHaveBeenCalledWith(2);
555 });
556});
557
558describe('skipWhile', () => {
559 const noop = operators.skipWhile(() => false);
560 passesPassivePull(noop);
561 passesActivePush(noop);
562 passesSinkClose(noop);
563 passesSourceEnd(noop);
564 passesSingleStart(noop);
565 passesAsyncSequence(noop);
566
567 it('skips values until one fails a predicate', () => {
568 const { source, next } = sources.makeSubject<number>();
569 const fn = vi.fn();
570
571 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source));
572
573 next(1);
574 expect(fn).not.toHaveBeenCalled();
575 next(2);
576 expect(fn).toHaveBeenCalledWith(2);
577 });
578});
579
580describe('switchMap', () => {
581 const noop = operators.switchMap(x => sources.fromValue(x));
582 passesPassivePull(noop);
583 passesActivePush(noop);
584 passesSinkClose(noop);
585 passesSourcePushThenEnd(noop);
586 passesSingleStart(noop);
587 passesStrictEnd(noop);
588 passesAsyncSequence(noop);
589
590 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
591 it('emits values from each flattened synchronous source', () => {
592 const { source, next, complete } = sources.makeSubject<number>();
593 const fn = vi.fn();
594
595 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
596
597 next(1);
598 next(3);
599 complete();
600
601 expect(fn).toHaveBeenCalledTimes(6);
602 expect(fn.mock.calls).toEqual([
603 [start(expect.any(Function))],
604 [push(1)],
605 [push(2)],
606 [push(3)],
607 [push(4)],
608 [SignalKind.End],
609 ]);
610 });
611
612 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
613 it('lets inner sources finish when outer source ends', () => {
614 const signals: Signal<any>[] = [];
615 const teardown = vi.fn();
616 const fn = (signal: Signal<any>) => {
617 signals.push(signal);
618 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
619 signal[0](TalkbackKind.Pull);
620 signal[0](TalkbackKind.Close);
621 }
622 };
623
624 operators.switchMap(() => {
625 return sources.make(() => teardown);
626 })(sources.fromValue(null))(fn);
627
628 expect(teardown).toHaveBeenCalled();
629 expect(signals).toEqual([start(expect.any(Function))]);
630 });
631
632 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
633 it('emits values from each flattened asynchronous source, one at a time', () => {
634 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
635 const fn = vi.fn();
636
637 sinks.forEach(fn)(
638 operators.switchMap((x: number) =>
639 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5)))
640 )(source)
641 );
642
643 vi.runAllTimers();
644 expect(fn.mock.calls).toEqual([[1], [10], [20]]);
645 });
646});
647
648describe('take', () => {
649 const noop = operators.take(10);
650 passesPassivePull(noop);
651 passesActivePush(noop);
652 passesSinkClose(noop);
653 passesSourceEnd(noop);
654 passesSingleStart(noop);
655 passesStrictEnd(noop);
656 passesAsyncSequence(noop);
657
658 passesCloseAndEnd(operators.take(0));
659
660 it('emits values until a maximum is reached', () => {
661 const { source, next } = sources.makeSubject<number>();
662 const fn = vi.fn();
663
664 operators.take(1)(source)(fn);
665 next(1);
666
667 expect(fn).toHaveBeenCalledTimes(3);
668 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
669 });
670});
671
672describe('takeUntil', () => {
673 const noop = operators.takeUntil(sources.never);
674 passesPassivePull(noop);
675 passesActivePush(noop);
676 passesSinkClose(noop);
677 passesSourcePushThenEnd(noop);
678 passesSingleStart(noop);
679 passesStrictEnd(noop);
680 passesAsyncSequence(noop);
681
682 const ending = operators.takeUntil(sources.fromValue(null));
683 passesCloseAndEnd(ending);
684
685 it('emits values until a notifier emits', () => {
686 const { source: notifier$, next: notify } = sources.makeSubject<any>();
687 const { source: input$, next } = sources.makeSubject<number>();
688 const fn = vi.fn();
689
690 operators.takeUntil(notifier$)(input$)(fn);
691 next(1);
692
693 expect(fn).toHaveBeenCalledTimes(2);
694 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]);
695
696 notify(null);
697 expect(fn).toHaveBeenCalledTimes(3);
698 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End);
699 });
700});
701
702describe('takeWhile', () => {
703 const noop = operators.takeWhile(() => true);
704 passesPassivePull(noop);
705 passesActivePush(noop);
706 passesSinkClose(noop);
707 passesSourceEnd(noop);
708 passesSingleStart(noop);
709 passesAsyncSequence(noop);
710
711 const ending = operators.takeWhile(() => false);
712 passesCloseAndEnd(ending);
713
714 it('emits values while a predicate passes for all values', () => {
715 const { source, next } = sources.makeSubject<number>();
716 const fn = vi.fn();
717
718 operators.takeWhile((x: any) => x < 2)(source)(fn);
719 next(1);
720 next(2);
721
722 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
723 });
724});
725
726describe('takeLast', () => {
727 passesCloseAndEnd(operators.takeLast(0));
728
729 it('emits the last max values of an ended source', () => {
730 const { source, next, complete } = sources.makeSubject<number>();
731 const signals: Signal<any>[] = [];
732
733 let talkback: TalkbackFn;
734
735 operators.takeLast(1)(source)(signal => {
736 signals.push(signal);
737 if (signal === SignalKind.End) {
738 /*noop*/
739 } else if (signal.tag === SignalKind.Start) {
740 (talkback = signal[0])(TalkbackKind.Pull);
741 } else {
742 talkback!(TalkbackKind.Pull);
743 }
744 });
745
746 next(1);
747 next(2);
748
749 expect(signals.length).toBe(0);
750 complete();
751
752 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]);
753 });
754});
755
756describe('throttle', () => {
757 const noop = operators.throttle(() => 0);
758 passesPassivePull(noop);
759 passesActivePush(noop);
760 passesSinkClose(noop);
761 passesSourceEnd(noop);
762 passesSingleStart(noop);
763 passesAsyncSequence(noop);
764
765 it('should ignore emissions for a period of time after a value', () => {
766 const { source, next } = sources.makeSubject<number>();
767 const fn = vi.fn();
768
769 sinks.forEach(fn)(operators.throttle(() => 100)(source));
770
771 next(1);
772 expect(fn).toHaveBeenCalledWith(1);
773 vi.advanceTimersByTime(50);
774
775 next(2);
776 expect(fn).toHaveBeenCalledTimes(1);
777 vi.advanceTimersByTime(50);
778
779 next(3);
780 expect(fn).toHaveBeenCalledWith(3);
781 });
782});