1import * as deriving from './helpers/wonka_deriving';
2import * as sources from './wonka_sources.gen';
3import * as sinks from './wonka_sinks.gen';
4import * as operators from './wonka_operators.gen';
5import * as web from './web/wonkaJs.gen';
6import * as types from './wonka_types.gen';
7
8/* This tests a noop operator for passive Pull talkback signals.
9 A Pull will be sent from the sink upwards and should pass through
10 the operator until the source receives it, which then pushes a
11 value down. */
12const passesPassivePull = (
13 operator: types.operatorT<any, any>,
14 output: any = 0
15) =>
16 it('responds to Pull talkback signals (spec)', () => {
17 let talkback = null;
18 let push = 0;
19 const values = [];
20
21 const source: types.sourceT<any> = sink => {
22 sink(deriving.start(tb => {
23 if (!push && tb === deriving.pull) {
24 push++;
25 sink(deriving.push(0));
26 }
27 }));
28 };
29
30 const sink: types.sinkT<any> = signal => {
31 expect(deriving.isEnd(signal)).toBeFalsy();
32 if (deriving.isPush(signal)) {
33 values.push(deriving.unboxPush(signal));
34 } else if (deriving.isStart(signal)) {
35 talkback = deriving.unboxStart(signal);
36 }
37 };
38
39 operator(source)(sink);
40 // The Start signal should always come in immediately
41 expect(talkback).not.toBe(null);
42 // No Push signals should be issued initially
43 expect(values).toEqual([]);
44
45 // When pulling a value we expect an immediate response
46 talkback(deriving.pull);
47 jest.runAllTimers();
48 expect(values).toEqual([output]);
49 });
50
51/* This tests a noop operator for regular, active Push signals.
52 A Push will be sent downwards from the source, through the
53 operator to the sink. Pull events should be let through from
54 the sink after every Push event. */
55const passesActivePush = (
56 operator: types.operatorT<any, any>,
57 result: any = 0
58) =>
59 it('responds to eager Push signals (spec)', () => {
60 const values = [];
61 let talkback = null;
62 let push = null;
63 let pulls = 0;
64
65 const source: types.sourceT<any> = sink => {
66 push = (value: any) => sink(deriving.push(value));
67 sink(deriving.start(tb => {
68 if (tb === deriving.pull)
69 pulls++;
70 }));
71 };
72
73 const sink: types.sinkT<any> = signal => {
74 expect(deriving.isEnd(signal)).toBeFalsy();
75 if (deriving.isStart(signal)) {
76 talkback = deriving.unboxStart(signal);
77 } else if (deriving.isPush(signal)) {
78 values.push(deriving.unboxPush(signal));
79 talkback(deriving.pull);
80 }
81 };
82
83 operator(source)(sink);
84 // No Pull signals should be issued initially
85 expect(pulls).toBe(0);
86
87 // When pushing a value we expect an immediate response
88 push(0);
89 jest.runAllTimers();
90 expect(values).toEqual([result]);
91 // Subsequently the Pull signal should have travelled upwards
92 expect(pulls).toBe(1);
93 });
94
95/* This tests a noop operator for Close talkback signals from the sink.
96 A Close signal will be sent, which should be forwarded to the source,
97 which then ends the communication without sending an End signal. */
98const passesSinkClose = (operator: types.operatorT<any, any>) =>
99 it('responds to Close signals from sink (spec)', () => {
100 let talkback = null;
101 let closing = 0;
102
103 const source: types.sourceT<any> = sink => {
104 sink(deriving.start(tb => {
105 if (tb === deriving.pull && !closing) {
106 sink(deriving.push(0));
107 } else if (tb === deriving.close) {
108 closing++;
109 }
110 }));
111 };
112
113 const sink: types.sinkT<any> = signal => {
114 expect(deriving.isEnd(signal)).toBeFalsy();
115 if (deriving.isStart(signal)) {
116 talkback = deriving.unboxStart(signal);
117 } else if (deriving.isPush(signal)) {
118 talkback(deriving.close);
119 }
120 };
121
122 operator(source)(sink);
123
124 // When pushing a value we expect an immediate close signal
125 talkback(deriving.pull);
126 jest.runAllTimers();
127 expect(closing).toBe(1);
128 });
129
130/* This tests a noop operator for End signals from the source.
131 A Push and End signal will be sent after the first Pull talkback
132 signal from the sink, which shouldn't lead to any extra Close or Pull
133 talkback signals. */
134const passesSourceEnd = (
135 operator: types.operatorT<any, any>,
136 result: any = 0
137) =>
138 it('passes on immediate Push then End signals from source (spec)', () => {
139 const signals = [];
140 let talkback = null;
141 let pulls = 0;
142 let ending = 0;
143
144 const source: types.sourceT<any> = sink => {
145 sink(deriving.start(tb => {
146 expect(tb).not.toBe(deriving.close);
147 if (tb === deriving.pull) {
148 pulls++;
149 if (pulls === 1) {
150 sink(deriving.push(0));
151 sink(deriving.end());
152 }
153 }
154 }));
155 };
156
157 const sink: types.sinkT<any> = signal => {
158 if (deriving.isStart(signal)) {
159 talkback = deriving.unboxStart(signal);
160 } else {
161 signals.push(signal);
162 if (deriving.isEnd(signal)) ending++;
163 }
164 };
165
166 operator(source)(sink);
167
168 // When pushing a value we expect an immediate Push then End signal
169 talkback(deriving.pull);
170 jest.runAllTimers();
171 expect(ending).toBe(1);
172 expect(signals).toEqual([deriving.push(result), deriving.end()]);
173 // Also no additional pull event should be created by the operator
174 expect(pulls).toBe(1);
175 });
176
177/* This tests a noop operator for End signals from the source
178 after the first pull in response to another.
179 This is similar to passesSourceEnd but more well behaved since
180 mergeMap/switchMap/concatMap are eager operators. */
181const passesSourcePushThenEnd = (
182 operator: types.operatorT<any, any>,
183 result: any = 0
184) =>
185 it('passes on End signals from source (spec)', () => {
186 const signals = [];
187 let talkback = null;
188 let pulls = 0;
189 let ending = 0;
190
191 const source: types.sourceT<any> = sink => {
192 sink(deriving.start(tb => {
193 expect(tb).not.toBe(deriving.close);
194 if (tb === deriving.pull) {
195 pulls++;
196 if (pulls <= 2) { sink(deriving.push(0)); }
197 else { sink(deriving.end()); }
198 }
199 }));
200 };
201
202 const sink: types.sinkT<any> = signal => {
203 if (deriving.isStart(signal)) {
204 talkback = deriving.unboxStart(signal);
205 } else {
206 signals.push(signal);
207 if (deriving.isPush(signal)) talkback(deriving.pull);
208 if (deriving.isEnd(signal)) ending++;
209 }
210 };
211
212 operator(source)(sink);
213
214 // When pushing a value we expect an immediate Push then End signal
215 talkback(deriving.pull);
216 jest.runAllTimers();
217 expect(ending).toBe(1);
218 expect(pulls).toBe(3);
219 expect(signals).toEqual([
220 deriving.push(result),
221 deriving.push(result),
222 deriving.end()
223 ]);
224 });
225
226/* This tests a noop operator for Start signals from the source.
227 When the operator's sink is started by the source it'll receive
228 a Start event. As a response it should never send more than one
229 Start signals to the sink. */
230const passesSingleStart = (operator: types.operatorT<any, any>) =>
231 it('sends a single Start event to the incoming sink (spec)', () => {
232 let start = 0;
233
234 const source: types.sourceT<any> = sink => {
235 sink(deriving.start(() => {}));
236 };
237
238 const sink: types.sinkT<any> = signal => {
239 if (deriving.isStart(signal)) start++;
240 };
241
242 // When starting the operator we expect a single start event on the sink
243 operator(source)(sink);
244 expect(start).toBe(1);
245 });
246
247/* This tests a noop operator for silence after End signals from the source.
248 When the operator receives the End signal it shouldn't forward any other
249 signals to the sink anymore.
250 This isn't a strict requirement, but some operators should ensure that
251 all sources are well behaved. This is particularly true for operators
252 that either Close sources themselves or may operate on multiple sources. */
253const passesStrictEnd = (operator: types.operatorT<any, any>) => {
254 it('stops all signals after End has been received (spec: strict end)', () => {
255 let pulls = 0;
256 const signals = [];
257
258 const source: types.sourceT<any> = sink => {
259 sink(deriving.start(tb => {
260 if (tb === deriving.pull) {
261 pulls++;
262 sink(deriving.end());
263 sink(deriving.push(123));
264 }
265 }));
266 };
267
268 const sink: types.sinkT<any> = signal => {
269 if (deriving.isStart(signal)) {
270 deriving.unboxStart(signal)(deriving.pull);
271 } else {
272 signals.push(signal);
273 }
274 };
275
276 operator(source)(sink);
277
278 // The Push signal should've been dropped
279 jest.runAllTimers();
280 expect(signals).toEqual([deriving.end()]);
281 expect(pulls).toBe(1);
282 });
283
284 it('stops all signals after Close has been received (spec: strict close)', () => {
285 const signals = [];
286
287 const source: types.sourceT<any> = sink => {
288 sink(deriving.start(tb => {
289 if (tb === deriving.close) {
290 sink(deriving.push(123));
291 }
292 }));
293 };
294
295 const sink: types.sinkT<any> = signal => {
296 if (deriving.isStart(signal)) {
297 deriving.unboxStart(signal)(deriving.close);
298 } else {
299 signals.push(signal);
300 }
301 };
302
303 operator(source)(sink);
304
305 // The Push signal should've been dropped
306 jest.runAllTimers();
307 expect(signals).toEqual([]);
308 });
309};
310
311/* This tests an immediately closing operator for End signals to
312 the sink and Close signals to the source.
313 When an operator closes immediately we expect to see a Close
314 signal at the source and an End signal to the sink, since the
315 closing operator is expected to end the entire chain. */
316const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) =>
317 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
318 let closing = 0;
319 let ending = 0;
320
321 const source: types.sourceT<any> = sink => {
322 sink(deriving.start(tb => {
323 // For some operator tests we do need to send a single value
324 if (tb === deriving.pull)
325 sink(deriving.push(null));
326 if (tb === deriving.close)
327 closing++;
328 }));
329 };
330
331 const sink: types.sinkT<any> = signal => {
332 if (deriving.isStart(signal)) {
333 deriving.unboxStart(signal)(deriving.pull);
334 } if (deriving.isEnd(signal)) {
335 ending++;
336 }
337 };
338
339 // We expect the operator to immediately end and close
340 closingOperator(source)(sink);
341 expect(closing).toBe(1);
342 expect(ending).toBe(1);
343 });
344
345const passesAsyncSequence = (
346 operator: types.operatorT<any, any>,
347 result: any = 0
348) =>
349 it('passes an async push with an async end (spec)', () => {
350 let hasPushed = false;
351 const signals = [];
352
353 const source: types.sourceT<any> = sink => {
354 sink(deriving.start(tb => {
355 if (tb === deriving.pull && !hasPushed) {
356 hasPushed = true;
357 setTimeout(() => sink(deriving.push(0)), 10);
358 setTimeout(() => sink(deriving.end()), 20);
359 }
360 }));
361 };
362
363 const sink: types.sinkT<any> = signal => {
364 if (deriving.isStart(signal)) {
365 setTimeout(() => {
366 deriving.unboxStart(signal)(deriving.pull);
367 }, 5);
368 } else {
369 signals.push(signal);
370 }
371 };
372
373 // We initially expect to see the push signal
374 // Afterwards after all timers all other signals come in
375 operator(source)(sink);
376 expect(signals.length).toBe(0);
377 jest.advanceTimersByTime(5);
378 expect(hasPushed).toBeTruthy();
379 jest.runAllTimers();
380
381 expect(signals).toEqual([
382 deriving.push(result),
383 deriving.end()
384 ]);
385 });
386
387beforeEach(() => {
388 jest.useFakeTimers();
389});
390
391describe('combine', () => {
392 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source);
393
394 passesPassivePull(noop, [0, 0]);
395 passesActivePush(noop, [0, 0]);
396 passesSinkClose(noop);
397 passesSourceEnd(noop, [0, 0]);
398 passesSingleStart(noop);
399 passesStrictEnd(noop);
400
401 it('emits the zipped values of two sources', () => {
402 const { source: sourceA, next: nextA } = sources.makeSubject();
403 const { source: sourceB, next: nextB } = sources.makeSubject();
404 const fn = jest.fn();
405
406 sinks.forEach(fn)(operators.combine(sourceA, sourceB));
407
408 nextA(1);
409 expect(fn).not.toHaveBeenCalled();
410 nextB(2);
411 expect(fn).toHaveBeenCalledWith([1, 2]);
412 });
413});
414
415describe('buffer', () => {
416 const valueThenNever: types.sourceT<any> = sink =>
417 sink(deriving.start(tb => {
418 if (tb === deriving.pull)
419 sink(deriving.push(null));
420 }));
421
422 const noop = operators.buffer(valueThenNever);
423
424 passesPassivePull(noop, [0]);
425 passesActivePush(noop, [0]);
426 passesSinkClose(noop);
427 passesSourcePushThenEnd(noop, [0]);
428 passesSingleStart(noop);
429 passesStrictEnd(noop);
430
431 it('emits batches of input values when a notifier emits', () => {
432 const { source: notifier$, next: notify } = sources.makeSubject();
433 const { source: input$, next } = sources.makeSubject();
434 const fn = jest.fn();
435
436 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
437
438 next(1);
439 next(2);
440 expect(fn).not.toHaveBeenCalled();
441
442 notify(null);
443 expect(fn).toHaveBeenCalledWith([1, 2]);
444
445 next(3);
446 notify(null);
447 expect(fn).toHaveBeenCalledWith([3]);
448 });
449});
450
451describe('concatMap', () => {
452 const noop = operators.concatMap(x => sources.fromValue(x));
453 passesPassivePull(noop);
454 passesActivePush(noop);
455 passesSinkClose(noop);
456 passesSourcePushThenEnd(noop);
457 passesSingleStart(noop);
458 passesStrictEnd(noop);
459 passesAsyncSequence(noop);
460
461 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
462 it('emits values from each flattened synchronous source', () => {
463 const { source, next, complete } = sources.makeSubject<number>();
464 const fn = jest.fn();
465
466 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
467
468 next(1);
469 next(3);
470 complete();
471
472 expect(fn).toHaveBeenCalledTimes(6);
473 expect(fn.mock.calls).toEqual([
474 [deriving.start(expect.any(Function))],
475 [deriving.push(1)],
476 [deriving.push(2)],
477 [deriving.push(3)],
478 [deriving.push(4)],
479 [deriving.end()],
480 ]);
481 });
482
483 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
484 it('lets inner sources finish when outer source ends', () => {
485 const values = [];
486 const teardown = jest.fn();
487 const fn = (signal: types.signalT<any>) => {
488 values.push(signal);
489 if (deriving.isStart(signal)) {
490 deriving.unboxStart(signal)(deriving.pull);
491 deriving.unboxStart(signal)(deriving.close);
492 }
493 };
494
495 operators.concatMap(() => {
496 return sources.make(() => teardown);
497 })(sources.fromValue(null))(fn);
498
499 expect(teardown).toHaveBeenCalled();
500 expect(values).toEqual([
501 deriving.start(expect.any(Function)),
502 ]);
503 });
504
505 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
506 it('emits values from each flattened asynchronous source, one at a time', () => {
507 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
508 const fn = jest.fn();
509
510 sinks.forEach(fn)(
511 operators.concatMap((x: number) => {
512 return web.delay(5)(sources.fromArray([x, x * 2]));
513 })(source)
514 );
515
516 jest.advanceTimersByTime(14);
517 expect(fn.mock.calls).toEqual([
518 [1],
519 [2],
520 ]);
521
522 jest.runAllTimers();
523 expect(fn.mock.calls).toEqual([
524 [1],
525 [2],
526 [10],
527 [20],
528 ]);
529 });
530
531 it('works for fully asynchronous sources', () => {
532 const fn = jest.fn();
533
534 sinks.forEach(fn)(
535 operators.concatMap(() => {
536 return sources.make(observer => {
537 setTimeout(() => observer.next(1));
538 return () => {};
539 })
540 })(sources.fromValue(null))
541 );
542
543 jest.runAllTimers();
544 expect(fn).toHaveBeenCalledWith(1);
545 });
546
547 it('emits synchronous values in order', () => {
548 const values = [];
549
550 sinks.forEach(x => values.push(x))(
551 operators.concat([
552 sources.fromArray([1, 2]),
553 sources.fromArray([3, 4])
554 ])
555 );
556
557 expect(values).toEqual([ 1, 2, 3, 4 ]);
558 });
559});
560
561describe('debounce', () => {
562 const noop = web.debounce(() => 0);
563 passesPassivePull(noop);
564 passesActivePush(noop);
565 passesSinkClose(noop);
566 passesSourceEnd(noop);
567 passesSingleStart(noop);
568 passesStrictEnd(noop);
569 passesAsyncSequence(noop);
570
571 it('waits for a specified amount of silence before emitting the last value', () => {
572 const { source, next } = sources.makeSubject<number>();
573 const fn = jest.fn();
574
575 sinks.forEach(fn)(web.debounce(() => 100)(source));
576
577 next(1);
578 jest.advanceTimersByTime(50);
579 expect(fn).not.toHaveBeenCalled();
580
581 next(2);
582 jest.advanceTimersByTime(99);
583 expect(fn).not.toHaveBeenCalled();
584
585 jest.advanceTimersByTime(1);
586 expect(fn).toHaveBeenCalledWith(2);
587 });
588
589 it('emits debounced value with delayed End signal', () => {
590 const { source, next, complete } = sources.makeSubject<number>();
591 const fn = jest.fn();
592
593 sinks.forEach(fn)(web.debounce(() => 100)(source));
594
595 next(1);
596 complete();
597 jest.advanceTimersByTime(100);
598 expect(fn).toHaveBeenCalled();
599 });
600});
601
602describe('delay', () => {
603 const noop = web.delay(0);
604 passesPassivePull(noop);
605 passesActivePush(noop);
606 passesSinkClose(noop);
607 passesSourceEnd(noop);
608 passesSingleStart(noop);
609 passesAsyncSequence(noop);
610
611 it('delays outputs by a specified delay timeout value', () => {
612 const { source, next } = sources.makeSubject();
613 const fn = jest.fn();
614
615 sinks.forEach(fn)(web.delay(100)(source));
616
617 next(1);
618 expect(fn).not.toHaveBeenCalled();
619
620 jest.advanceTimersByTime(100);
621 expect(fn).toHaveBeenCalledWith(1);
622 });
623});
624
625describe('filter', () => {
626 const noop = operators.filter(() => true);
627 passesPassivePull(noop);
628 passesActivePush(noop);
629 passesSinkClose(noop);
630 passesSourceEnd(noop);
631 passesSingleStart(noop);
632 passesAsyncSequence(noop);
633
634 it('prevents emissions for which a predicate fails', () => {
635 const { source, next } = sources.makeSubject();
636 const fn = jest.fn();
637
638 sinks.forEach(fn)(operators.filter(x => !!x)(source));
639
640 next(false);
641 expect(fn).not.toHaveBeenCalled();
642
643 next(true);
644 expect(fn).toHaveBeenCalledWith(true);
645 });
646});
647
648describe('map', () => {
649 const noop = operators.map(x => x);
650 passesPassivePull(noop);
651 passesActivePush(noop);
652 passesSinkClose(noop);
653 passesSourceEnd(noop);
654 passesSingleStart(noop);
655 passesAsyncSequence(noop);
656
657 it('maps over values given a transform function', () => {
658 const { source, next } = sources.makeSubject<number>();
659 const fn = jest.fn();
660
661 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
662
663 next(1);
664 expect(fn).toHaveBeenCalledWith(2);
665 });
666});
667
668describe('mergeMap', () => {
669 const noop = operators.mergeMap(x => sources.fromValue(x));
670 passesPassivePull(noop);
671 passesActivePush(noop);
672 passesSinkClose(noop);
673 passesSourcePushThenEnd(noop);
674 passesSingleStart(noop);
675 passesStrictEnd(noop);
676 passesAsyncSequence(noop);
677
678 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
679 it('emits values from each flattened synchronous source', () => {
680 const { source, next, complete } = sources.makeSubject<number>();
681 const fn = jest.fn();
682
683 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
684
685 next(1);
686 next(3);
687 complete();
688
689 expect(fn.mock.calls).toEqual([
690 [deriving.start(expect.any(Function))],
691 [deriving.push(1)],
692 [deriving.push(2)],
693 [deriving.push(3)],
694 [deriving.push(4)],
695 [deriving.end()],
696 ]);
697 });
698
699 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
700 it('lets inner sources finish when outer source ends', () => {
701 const values = [];
702 const teardown = jest.fn();
703 const fn = (signal: types.signalT<any>) => {
704 values.push(signal);
705 if (deriving.isStart(signal)) {
706 deriving.unboxStart(signal)(deriving.pull);
707 deriving.unboxStart(signal)(deriving.close);
708 }
709 };
710
711 operators.mergeMap(() => {
712 return sources.make(() => teardown);
713 })(sources.fromValue(null))(fn);
714
715 expect(teardown).toHaveBeenCalled();
716 expect(values).toEqual([
717 deriving.start(expect.any(Function)),
718 ]);
719 });
720
721 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
722 it('emits values from each flattened asynchronous source simultaneously', () => {
723 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
724 const fn = jest.fn();
725
726 sinks.forEach(fn)(
727 operators.mergeMap((x: number) => {
728 return web.delay(5)(sources.fromArray([x, x * 2]));
729 })(source)
730 );
731
732 jest.runAllTimers();
733 expect(fn.mock.calls).toEqual([
734 [1],
735 [10],
736 [2],
737 [20],
738 ]);
739 });
740
741 it('emits synchronous values in order', () => {
742 const values = [];
743
744 sinks.forEach(x => values.push(x))(
745 operators.merge([
746 sources.fromArray([1, 2]),
747 sources.fromArray([3, 4])
748 ])
749 );
750
751 expect(values).toEqual([ 1, 2, 3, 4 ]);
752 });
753});
754
755describe('onEnd', () => {
756 const noop = operators.onEnd(() => {});
757 passesPassivePull(noop);
758 passesActivePush(noop);
759 passesSinkClose(noop);
760 passesSourceEnd(noop);
761 passesSingleStart(noop);
762 passesAsyncSequence(noop);
763
764 it('calls a callback when the source ends', () => {
765 const { source, next, complete } = sources.makeSubject<number>();
766 const fn = jest.fn();
767
768 sinks.forEach(() => {})(operators.onEnd(fn)(source));
769
770 next(null);
771 expect(fn).not.toHaveBeenCalled();
772
773 complete();
774 expect(fn).toHaveBeenCalled();
775 });
776});
777
778describe('onPush', () => {
779 const noop = operators.onPush(() => {});
780 passesPassivePull(noop);
781 passesActivePush(noop);
782 passesSinkClose(noop);
783 passesSourceEnd(noop);
784 passesSingleStart(noop);
785 passesAsyncSequence(noop);
786
787 it('calls a callback when the source emits', () => {
788 const { source, next } = sources.makeSubject<number>();
789 const fn = jest.fn();
790
791 sinks.forEach(() => {})(operators.onPush(fn)(source));
792
793 next(1);
794 expect(fn).toHaveBeenCalledWith(1);
795 next(2);
796 expect(fn).toHaveBeenCalledWith(2);
797 });
798
799 it('is the same as `tap`', () => {
800 expect(operators.onPush).toBe(operators.tap);
801 });
802});
803
804describe('onStart', () => {
805 const noop = operators.onStart(() => {});
806 passesPassivePull(noop);
807 passesActivePush(noop);
808 passesSinkClose(noop);
809 passesSourceEnd(noop);
810 passesSingleStart(noop);
811 passesAsyncSequence(noop);
812
813 it('is called when the source starts', () => {
814 let sink: types.sinkT<any>;
815
816 const fn = jest.fn();
817 const source: types.sourceT<any> = _sink => { sink = _sink; };
818
819 sinks.forEach(() => {})(operators.onStart(fn)(source));
820
821 expect(fn).not.toHaveBeenCalled();
822
823 sink(deriving.start(() => {}));
824 expect(fn).toHaveBeenCalled();
825 });
826});
827
828describe('sample', () => {
829 const valueThenNever: types.sourceT<any> = sink =>
830 sink(deriving.start(tb => {
831 if (tb === deriving.pull)
832 sink(deriving.push(null));
833 }));
834
835 const noop = operators.sample(valueThenNever);
836
837 passesPassivePull(noop);
838 passesActivePush(noop);
839 passesSinkClose(noop);
840 passesSourcePushThenEnd(noop);
841 passesSingleStart(noop);
842 passesStrictEnd(noop);
843
844 it('emits the latest value when a notifier source emits', () => {
845 const { source: notifier$, next: notify } = sources.makeSubject();
846 const { source: input$, next } = sources.makeSubject();
847 const fn = jest.fn();
848
849 sinks.forEach(fn)(operators.sample(notifier$)(input$));
850
851 next(1);
852 next(2);
853 expect(fn).not.toHaveBeenCalled();
854
855 notify(null);
856 expect(fn).toHaveBeenCalledWith(2);
857 });
858});
859
860describe('scan', () => {
861 const noop = operators.scan((_acc, x) => x, null);
862 passesPassivePull(noop);
863 passesActivePush(noop);
864 passesSinkClose(noop);
865 passesSourceEnd(noop);
866 passesSingleStart(noop);
867 passesAsyncSequence(noop);
868
869 it('folds values continuously with a reducer and initial value', () => {
870 const { source: input$, next } = sources.makeSubject<number>();
871 const fn = jest.fn();
872
873 const reducer = (acc: number, x: number) => acc + x;
874 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
875
876 next(1);
877 expect(fn).toHaveBeenCalledWith(1);
878 next(2);
879 expect(fn).toHaveBeenCalledWith(3);
880 });
881});
882
883describe('share', () => {
884 const noop = operators.share;
885 passesPassivePull(noop);
886 passesActivePush(noop);
887 passesSinkClose(noop);
888 passesSourceEnd(noop);
889 passesSingleStart(noop);
890 passesStrictEnd(noop);
891 passesAsyncSequence(noop);
892
893 it('shares output values between sinks', () => {
894 let push = () => {};
895
896 const source: types.sourceT<any> = operators.share(sink => {
897 sink(deriving.start(() => {}));
898 push = () => {
899 sink(deriving.push([0]));
900 sink(deriving.end());
901 };
902 });
903
904 const fnA = jest.fn();
905 const fnB = jest.fn();
906
907 sinks.forEach(fnA)(source);
908 sinks.forEach(fnB)(source);
909 push();
910
911 expect(fnA).toHaveBeenCalledWith([0]);
912 expect(fnB).toHaveBeenCalledWith([0]);
913 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
914 });
915});
916
917describe('skip', () => {
918 const noop = operators.skip(0);
919 passesPassivePull(noop);
920 passesActivePush(noop);
921 passesSinkClose(noop);
922 passesSourceEnd(noop);
923 passesSingleStart(noop);
924 passesAsyncSequence(noop);
925
926 it('skips a number of values before emitting normally', () => {
927 const { source, next } = sources.makeSubject<number>();
928 const fn = jest.fn();
929
930 sinks.forEach(fn)(operators.skip(1)(source));
931
932 next(1);
933 expect(fn).not.toHaveBeenCalled();
934 next(2);
935 expect(fn).toHaveBeenCalledWith(2);
936 });
937});
938
939describe('skipUntil', () => {
940 const noop = operators.skipUntil(sources.fromValue(null));
941 passesPassivePull(noop);
942 passesActivePush(noop);
943 passesSinkClose(noop);
944 passesSourceEnd(noop);
945 passesSingleStart(noop);
946 passesAsyncSequence(noop);
947 passesStrictEnd(noop);
948
949 it('skips values until the notifier source emits', () => {
950 const { source: notifier$, next: notify } = sources.makeSubject();
951 const { source: input$, next } = sources.makeSubject<number>();
952 const fn = jest.fn();
953
954 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
955
956 next(1);
957 expect(fn).not.toHaveBeenCalled();
958 notify(null);
959 next(2);
960 expect(fn).toHaveBeenCalledWith(2);
961 });
962});
963
964describe('skipWhile', () => {
965 const noop = operators.skipWhile(() => false);
966 passesPassivePull(noop);
967 passesActivePush(noop);
968 passesSinkClose(noop);
969 passesSourceEnd(noop);
970 passesSingleStart(noop);
971 passesAsyncSequence(noop);
972
973 it('skips values until one fails a predicate', () => {
974 const { source, next } = sources.makeSubject<number>();
975 const fn = jest.fn();
976
977 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source));
978
979 next(1);
980 expect(fn).not.toHaveBeenCalled();
981 next(2);
982 expect(fn).toHaveBeenCalledWith(2);
983 });
984});
985
986describe('switchMap', () => {
987 const noop = operators.switchMap(x => sources.fromValue(x));
988 passesPassivePull(noop);
989 passesActivePush(noop);
990 passesSinkClose(noop);
991 passesSourcePushThenEnd(noop);
992 passesSingleStart(noop);
993 passesStrictEnd(noop);
994 passesAsyncSequence(noop);
995
996 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
997 it('emits values from each flattened synchronous source', () => {
998 const { source, next, complete } = sources.makeSubject<number>();
999 const fn = jest.fn();
1000
1001 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
1002
1003 next(1);
1004 next(3);
1005 complete();
1006
1007 expect(fn).toHaveBeenCalledTimes(6);
1008 expect(fn.mock.calls).toEqual([
1009 [deriving.start(expect.any(Function))],
1010 [deriving.push(1)],
1011 [deriving.push(2)],
1012 [deriving.push(3)],
1013 [deriving.push(4)],
1014 [deriving.end()],
1015 ]);
1016 });
1017
1018 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
1019 it('lets inner sources finish when outer source ends', () => {
1020 const values = [];
1021 const teardown = jest.fn();
1022 const fn = (signal: types.signalT<any>) => {
1023 values.push(signal);
1024 if (deriving.isStart(signal)) {
1025 deriving.unboxStart(signal)(deriving.pull);
1026 deriving.unboxStart(signal)(deriving.close);
1027 }
1028 };
1029
1030 operators.switchMap(() => {
1031 return sources.make(() => teardown);
1032 })(sources.fromValue(null))(fn);
1033
1034 expect(teardown).toHaveBeenCalled();
1035 expect(values).toEqual([
1036 deriving.start(expect.any(Function)),
1037 ]);
1038 });
1039
1040 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
1041 it('emits values from each flattened asynchronous source, one at a time', () => {
1042 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
1043 const fn = jest.fn();
1044
1045 sinks.forEach(fn)(
1046 operators.switchMap((x: number) => (
1047 operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5)))
1048 ))(source)
1049 );
1050
1051 jest.runAllTimers();
1052 expect(fn.mock.calls).toEqual([
1053 [1],
1054 [10],
1055 [20],
1056 ]);
1057 });
1058});
1059
1060describe('take', () => {
1061 const noop = operators.take(10);
1062 passesPassivePull(noop);
1063 passesActivePush(noop);
1064 passesSinkClose(noop);
1065 passesSourceEnd(noop);
1066 passesSingleStart(noop);
1067 passesStrictEnd(noop);
1068 passesAsyncSequence(noop);
1069
1070 passesCloseAndEnd(operators.take(0));
1071
1072 it('emits values until a maximum is reached', () => {
1073 const { source, next } = sources.makeSubject<number>();
1074 const fn = jest.fn();
1075
1076 operators.take(1)(source)(fn);
1077 next(1);
1078
1079 expect(fn).toHaveBeenCalledTimes(3);
1080 expect(fn.mock.calls).toEqual([
1081 [deriving.start(expect.any(Function))],
1082 [deriving.push(1)],
1083 [deriving.end()],
1084 ]);
1085 });
1086});
1087
1088describe('takeUntil', () => {
1089 const noop = operators.takeUntil(sources.never);
1090 passesPassivePull(noop);
1091 passesActivePush(noop);
1092 passesSinkClose(noop);
1093 passesSourcePushThenEnd(noop);
1094 passesSingleStart(noop);
1095 passesStrictEnd(noop);
1096 passesAsyncSequence(noop);
1097
1098 const ending = operators.takeUntil(sources.fromValue(null));
1099 passesCloseAndEnd(ending);
1100
1101 it('emits values until a notifier emits', () => {
1102 const { source: notifier$, next: notify } = sources.makeSubject<number>();
1103 const { source: input$, next } = sources.makeSubject<number>();
1104 const fn = jest.fn();
1105
1106 operators.takeUntil(notifier$)(input$)(fn);
1107 next(1);
1108
1109 expect(fn).toHaveBeenCalledTimes(2);
1110 expect(fn.mock.calls).toEqual([
1111 [deriving.start(expect.any(Function))],
1112 [deriving.push(1)],
1113 ]);
1114
1115 notify(null);
1116 expect(fn).toHaveBeenCalledTimes(3);
1117 expect(fn.mock.calls[2][0]).toEqual(deriving.end());
1118 });
1119});
1120
1121describe('takeWhile', () => {
1122 const noop = operators.takeWhile(() => true);
1123 passesPassivePull(noop);
1124 passesActivePush(noop);
1125 passesSinkClose(noop);
1126 passesSourceEnd(noop);
1127 passesSingleStart(noop);
1128 passesAsyncSequence(noop);
1129
1130 const ending = operators.takeWhile(() => false);
1131 passesCloseAndEnd(ending);
1132
1133 it('emits values while a predicate passes for all values', () => {
1134 const { source, next } = sources.makeSubject<number>();
1135 const fn = jest.fn();
1136
1137 operators.takeWhile(x => x < 2)(source)(fn);
1138 next(1);
1139 next(2);
1140
1141 expect(fn.mock.calls).toEqual([
1142 [deriving.start(expect.any(Function))],
1143 [deriving.push(1)],
1144 [deriving.end()],
1145 ]);
1146 });
1147});
1148
1149describe('takeLast', () => {
1150 passesCloseAndEnd(operators.takeLast(0));
1151
1152 it('emits the last max values of an ended source', () => {
1153 const { source, next, complete } = sources.makeSubject<number>();
1154 const values = [];
1155
1156 let talkback;
1157 operators.takeLast(1)(source)(signal => {
1158 values.push(signal);
1159 if (deriving.isStart(signal))
1160 talkback = deriving.unboxStart(signal);
1161 if (!deriving.isEnd(signal))
1162 talkback(deriving.pull);
1163 });
1164
1165 next(1);
1166 next(2);
1167
1168 expect(values.length).toBe(0);
1169 complete();
1170
1171 expect(values).toEqual([
1172 deriving.start(expect.any(Function)),
1173 deriving.push(2),
1174 deriving.end(),
1175 ]);
1176 });
1177});
1178
1179describe('throttle', () => {
1180 const noop = web.throttle(() => 0);
1181 passesPassivePull(noop);
1182 passesActivePush(noop);
1183 passesSinkClose(noop);
1184 passesSourceEnd(noop);
1185 passesSingleStart(noop);
1186 passesAsyncSequence(noop);
1187
1188 it('should ignore emissions for a period of time after a value', () => {
1189 const { source, next } = sources.makeSubject<number>();
1190 const fn = jest.fn();
1191
1192 sinks.forEach(fn)(web.throttle(() => 100)(source));
1193
1194 next(1);
1195 expect(fn).toHaveBeenCalledWith(1);
1196 jest.advanceTimersByTime(50);
1197
1198 next(2);
1199 expect(fn).toHaveBeenCalledTimes(1);
1200 jest.advanceTimersByTime(50);
1201
1202 next(3);
1203 expect(fn).toHaveBeenCalledWith(3);
1204 });
1205});