1import * as deriving from './helpers/Wonka_deriving';
2import * as sources from './Wonka_sources.gen';
3import * as sinks from './Wonka_sinks.gen';
4import * as operators from './Wonka_operators.gen';
5import * as web from './web/WonkaJs.gen';
6import * as types from './Wonka_types.gen';
7
8/* This tests a noop operator for passive Pull talkback signals.
9 A Pull will be sent from the sink upwards and should pass through
10 the operator until the source receives it, which then pushes a
11 value down. */
12const passesPassivePull = (
13 operator: types.operatorT<any, any>,
14 output: any = 0
15) =>
16 it('responds to Pull talkback signals (spec)', () => {
17 let talkback = null;
18 let push = 0;
19 const values = [];
20
21 const source: types.sourceT<any> = sink => {
22 sink(deriving.start(tb => {
23 if (!push && tb === deriving.pull) {
24 push++;
25 sink(deriving.push(0));
26 }
27 }));
28 };
29
30 const sink: types.sinkT<any> = signal => {
31 expect(deriving.isEnd(signal)).toBeFalsy();
32 if (deriving.isPush(signal)) {
33 values.push(deriving.unboxPush(signal));
34 } else if (deriving.isStart(signal)) {
35 talkback = deriving.unboxStart(signal);
36 }
37 };
38
39 operator(source)(sink);
40 // The Start signal should always come in immediately
41 expect(talkback).not.toBe(null);
42 // No Push signals should be issued initially
43 expect(values).toEqual([]);
44
45 // When pulling a value we expect an immediate response
46 talkback(deriving.pull);
47 jest.runAllTimers();
48 expect(values).toEqual([output]);
49 });
50
51/* This tests a noop operator for regular, active Push signals.
52 A Push will be sent downwards from the source, through the
53 operator to the sink. Pull events should be let through from
54 the sink after every Push event. */
55const passesActivePush = (
56 operator: types.operatorT<any, any>,
57 result: any = 0
58) =>
59 it('responds to eager Push signals (spec)', () => {
60 const values = [];
61 let talkback = null;
62 let push = null;
63 let pulls = 0;
64
65 const source: types.sourceT<any> = sink => {
66 push = (value: any) => sink(deriving.push(value));
67 sink(deriving.start(tb => {
68 if (tb === deriving.pull)
69 pulls++;
70 }));
71 };
72
73 const sink: types.sinkT<any> = signal => {
74 expect(deriving.isEnd(signal)).toBeFalsy();
75 if (deriving.isStart(signal)) {
76 talkback = deriving.unboxStart(signal);
77 } else if (deriving.isPush(signal)) {
78 values.push(deriving.unboxPush(signal));
79 talkback(deriving.pull);
80 }
81 };
82
83 operator(source)(sink);
84 // No Pull signals should be issued initially
85 expect(pulls).toBe(0);
86
87 // When pushing a value we expect an immediate response
88 push(0);
89 jest.runAllTimers();
90 expect(values).toEqual([result]);
91 // Subsequently the Pull signal should have travelled upwards
92 expect(pulls).toBe(1);
93 });
94
95/* This tests a noop operator for Close talkback signals from the sink.
96 A Close signal will be sent, which should be forwarded to the source,
97 which then ends the communication without sending an End signal. */
98const passesSinkClose = (operator: types.operatorT<any, any>) =>
99 it('responds to Close signals from sink (spec)', () => {
100 let talkback = null;
101 let closing = 0;
102
103 const source: types.sourceT<any> = sink => {
104 sink(deriving.start(tb => {
105 if (tb === deriving.pull && !closing) {
106 sink(deriving.push(0));
107 } else if (tb === deriving.close) {
108 closing++;
109 }
110 }));
111 };
112
113 const sink: types.sinkT<any> = signal => {
114 expect(deriving.isEnd(signal)).toBeFalsy();
115 if (deriving.isStart(signal)) {
116 talkback = deriving.unboxStart(signal);
117 } else if (deriving.isPush(signal)) {
118 talkback(deriving.close);
119 }
120 };
121
122 operator(source)(sink);
123
124 // When pushing a value we expect an immediate close signal
125 talkback(deriving.pull);
126 jest.runAllTimers();
127 expect(closing).toBe(1);
128 });
129
130/* This tests a noop operator for End signals from the source.
131 A Push and End signal will be sent after the first Pull talkback
132 signal from the sink, which shouldn't lead to any extra Close or Pull
133 talkback signals. */
134const passesSourceEnd = (
135 operator: types.operatorT<any, any>,
136 result: any = 0
137) =>
138 it('passes on immediate Push then End signals from source (spec)', () => {
139 const signals = [];
140 let talkback = null;
141 let pulls = 0;
142 let ending = 0;
143
144 const source: types.sourceT<any> = sink => {
145 sink(deriving.start(tb => {
146 expect(tb).not.toBe(deriving.close);
147 if (tb === deriving.pull) {
148 pulls++;
149 if (pulls === 1) {
150 sink(deriving.push(0));
151 sink(deriving.end());
152 }
153 }
154 }));
155 };
156
157 const sink: types.sinkT<any> = signal => {
158 if (deriving.isStart(signal)) {
159 talkback = deriving.unboxStart(signal);
160 } else {
161 signals.push(signal);
162 if (deriving.isEnd(signal)) ending++;
163 }
164 };
165
166 operator(source)(sink);
167
168 // When pushing a value we expect an immediate Push then End signal
169 talkback(deriving.pull);
170 jest.runAllTimers();
171 expect(ending).toBe(1);
172 expect(signals).toEqual([deriving.push(result), deriving.end()]);
173 // Also no additional pull event should be created by the operator
174 expect(pulls).toBe(1);
175 });
176
177/* This tests a noop operator for End signals from the source
178 after the first pull in response to another.
179 This is similar to passesSourceEnd but more well behaved since
180 mergeMap/switchMap/concatMap are eager operators. */
181const passesSourcePushThenEnd = (
182 operator: types.operatorT<any, any>,
183 result: any = 0
184) =>
185 it('passes on End signals from source (spec)', () => {
186 const signals = [];
187 let talkback = null;
188 let pulls = 0;
189 let ending = 0;
190
191 const source: types.sourceT<any> = sink => {
192 sink(deriving.start(tb => {
193 expect(tb).not.toBe(deriving.close);
194 if (tb === deriving.pull) {
195 pulls++;
196 if (pulls <= 2) { sink(deriving.push(0)); }
197 else { sink(deriving.end()); }
198 }
199 }));
200 };
201
202 const sink: types.sinkT<any> = signal => {
203 if (deriving.isStart(signal)) {
204 talkback = deriving.unboxStart(signal);
205 } else {
206 signals.push(signal);
207 if (deriving.isPush(signal)) talkback(deriving.pull);
208 if (deriving.isEnd(signal)) ending++;
209 }
210 };
211
212 operator(source)(sink);
213
214 // When pushing a value we expect an immediate Push then End signal
215 talkback(deriving.pull);
216 jest.runAllTimers();
217 expect(ending).toBe(1);
218 expect(pulls).toBe(3);
219 expect(signals).toEqual([
220 deriving.push(result),
221 deriving.push(result),
222 deriving.end()
223 ]);
224 });
225
226/* This tests a noop operator for Start signals from the source.
227 When the operator's sink is started by the source it'll receive
228 a Start event. As a response it should never send more than one
229 Start signals to the sink. */
230const passesSingleStart = (operator: types.operatorT<any, any>) =>
231 it('sends a single Start event to the incoming sink (spec)', () => {
232 let start = 0;
233
234 const source: types.sourceT<any> = sink => {
235 sink(deriving.start(() => {}));
236 };
237
238 const sink: types.sinkT<any> = signal => {
239 if (deriving.isStart(signal)) start++;
240 };
241
242 // When starting the operator we expect a single start event on the sink
243 operator(source)(sink);
244 expect(start).toBe(1);
245 });
246
247/* This tests a noop operator for silence after End signals from the source.
248 When the operator receives the End signal it shouldn't forward any other
249 signals to the sink anymore.
250 This isn't a strict requirement, but some operators should ensure that
251 all sources are well behaved. This is particularly true for operators
252 that either Close sources themselves or may operate on multiple sources. */
253const passesStrictEnd = (operator: types.operatorT<any, any>) => {
254 it('stops all signals after End has been received (spec: strict end)', () => {
255 let pulls = 0;
256 const signals = [];
257
258 const source: types.sourceT<any> = sink => {
259 sink(deriving.start(tb => {
260 if (tb === deriving.pull) {
261 pulls++;
262 sink(deriving.end());
263 sink(deriving.push(123));
264 }
265 }));
266 };
267
268 const sink: types.sinkT<any> = signal => {
269 if (deriving.isStart(signal)) {
270 deriving.unboxStart(signal)(deriving.pull);
271 } else {
272 signals.push(signal);
273 }
274 };
275
276 operator(source)(sink);
277
278 // The Push signal should've been dropped
279 jest.runAllTimers();
280 expect(signals).toEqual([deriving.end()]);
281 expect(pulls).toBe(1);
282 });
283
284 it('stops all signals after Close has been received (spec: strict close)', () => {
285 const signals = [];
286
287 const source: types.sourceT<any> = sink => {
288 sink(deriving.start(tb => {
289 if (tb === deriving.close) {
290 sink(deriving.push(123));
291 }
292 }));
293 };
294
295 const sink: types.sinkT<any> = signal => {
296 if (deriving.isStart(signal)) {
297 deriving.unboxStart(signal)(deriving.close);
298 } else {
299 signals.push(signal);
300 }
301 };
302
303 operator(source)(sink);
304
305 // The Push signal should've been dropped
306 jest.runAllTimers();
307 expect(signals).toEqual([]);
308 });
309};
310
311/* This tests an immediately closing operator for End signals to
312 the sink and Close signals to the source.
313 When an operator closes immediately we expect to see a Close
314 signal at the source and an End signal to the sink, since the
315 closing operator is expected to end the entire chain. */
316const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) =>
317 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
318 let closing = 0;
319 let ending = 0;
320
321 const source: types.sourceT<any> = sink => {
322 sink(deriving.start(tb => {
323 // For some operator tests we do need to send a single value
324 if (tb === deriving.pull)
325 sink(deriving.push(null));
326 if (tb === deriving.close)
327 closing++;
328 }));
329 };
330
331 const sink: types.sinkT<any> = signal => {
332 if (deriving.isStart(signal)) {
333 deriving.unboxStart(signal)(deriving.pull);
334 } if (deriving.isEnd(signal)) {
335 ending++;
336 }
337 };
338
339 // We expect the operator to immediately end and close
340 closingOperator(source)(sink);
341 expect(closing).toBe(1);
342 expect(ending).toBe(1);
343 });
344
345const passesAsyncSequence = (
346 operator: types.operatorT<any, any>,
347 result: any = 0
348) =>
349 it('passes an async push with an async end (spec)', () => {
350 let hasPushed = false;
351 const signals = [];
352
353 const source: types.sourceT<any> = sink => {
354 sink(deriving.start(tb => {
355 if (tb === deriving.pull && !hasPushed) {
356 hasPushed = true;
357 setTimeout(() => sink(deriving.push(0)), 10);
358 setTimeout(() => sink(deriving.end()), 20);
359 }
360 }));
361 };
362
363 const sink: types.sinkT<any> = signal => {
364 if (deriving.isStart(signal)) {
365 setTimeout(() => {
366 deriving.unboxStart(signal)(deriving.pull);
367 }, 5);
368 } else {
369 signals.push(signal);
370 }
371 };
372
373 // We initially expect to see the push signal
374 // Afterwards after all timers all other signals come in
375 operator(source)(sink);
376 expect(signals.length).toBe(0);
377 jest.advanceTimersByTime(5);
378 expect(hasPushed).toBeTruthy();
379 jest.runAllTimers();
380
381 expect(signals).toEqual([
382 deriving.push(result),
383 deriving.end()
384 ]);
385 });
386
387beforeEach(() => {
388 jest.useFakeTimers();
389});
390
391describe('combine', () => {
392 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source);
393
394 passesPassivePull(noop, [0, 0]);
395 passesActivePush(noop, [0, 0]);
396 passesSinkClose(noop);
397 passesSourceEnd(noop, [0, 0]);
398 passesSingleStart(noop);
399 passesStrictEnd(noop);
400
401 it('emits the zipped values of two sources', () => {
402 const { source: sourceA, next: nextA } = sources.makeSubject();
403 const { source: sourceB, next: nextB } = sources.makeSubject();
404 const fn = jest.fn();
405
406 sinks.forEach(fn)(operators.combine(sourceA, sourceB));
407
408 nextA(1);
409 expect(fn).not.toHaveBeenCalled();
410 nextB(2);
411 expect(fn).toHaveBeenCalledWith([1, 2]);
412 });
413});
414
415describe('buffer', () => {
416 const valueThenNever: types.sourceT<any> = sink =>
417 sink(deriving.start(tb => {
418 if (tb === deriving.pull)
419 sink(deriving.push(null));
420 }));
421
422 const noop = operators.buffer(valueThenNever);
423
424 passesPassivePull(noop, [0]);
425 passesActivePush(noop, [0]);
426 passesSinkClose(noop);
427 passesSourcePushThenEnd(noop, [0]);
428 passesSingleStart(noop);
429 passesStrictEnd(noop);
430
431 it('emits batches of input values when a notifier emits', () => {
432 const { source: notifier$, next: notify } = sources.makeSubject();
433 const { source: input$, next } = sources.makeSubject();
434 const fn = jest.fn();
435
436 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
437
438 next(1);
439 next(2);
440 expect(fn).not.toHaveBeenCalled();
441
442 notify(null);
443 expect(fn).toHaveBeenCalledWith([1, 2]);
444
445 next(3);
446 notify(null);
447 expect(fn).toHaveBeenCalledWith([3]);
448 });
449});
450
451describe('concatMap', () => {
452 const noop = operators.concatMap(x => sources.fromValue(x));
453 passesPassivePull(noop);
454 passesActivePush(noop);
455 passesSinkClose(noop);
456 passesSourcePushThenEnd(noop);
457 passesSingleStart(noop);
458 passesStrictEnd(noop);
459 passesAsyncSequence(noop);
460
461 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
462 it('emits values from each flattened synchronous source', () => {
463 const { source, next, complete } = sources.makeSubject<number>();
464 const fn = jest.fn();
465
466 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
467
468 next(1);
469 next(3);
470 complete();
471
472 expect(fn).toHaveBeenCalledTimes(6);
473 expect(fn.mock.calls).toEqual([
474 [deriving.start(expect.any(Function))],
475 [deriving.push(1)],
476 [deriving.push(2)],
477 [deriving.push(3)],
478 [deriving.push(4)],
479 [deriving.end()],
480 ]);
481 });
482
483 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
484 it('lets inner sources finish when outer source ends', () => {
485 const values = [];
486 const teardown = jest.fn();
487 const fn = (signal: types.signalT<any>) => {
488 values.push(signal);
489 if (deriving.isStart(signal)) {
490 deriving.unboxStart(signal)(deriving.pull);
491 deriving.unboxStart(signal)(deriving.close);
492 }
493 };
494
495 operators.concatMap(() => {
496 return sources.make(() => teardown);
497 })(sources.fromValue(null))(fn);
498
499 expect(teardown).toHaveBeenCalled();
500 expect(values).toEqual([
501 deriving.start(expect.any(Function)),
502 ]);
503 });
504
505 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
506 it('emits values from each flattened asynchronous source, one at a time', () => {
507 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
508 const fn = jest.fn();
509
510 sinks.forEach(fn)(
511 operators.concatMap((x: number) => {
512 return web.delay(5)(sources.fromArray([x, x * 2]));
513 })(source)
514 );
515
516 jest.advanceTimersByTime(14);
517 expect(fn.mock.calls).toEqual([
518 [1],
519 [2],
520 ]);
521
522 jest.runAllTimers();
523 expect(fn.mock.calls).toEqual([
524 [1],
525 [2],
526 [10],
527 [20],
528 ]);
529 });
530
531 it('works for fully asynchronous sources', () => {
532 const fn = jest.fn();
533
534 sinks.forEach(fn)(
535 operators.concatMap(() => {
536 return sources.make(observer => {
537 setTimeout(() => observer.next(1));
538 return () => {};
539 })
540 })(sources.fromValue(null))
541 );
542
543 jest.runAllTimers();
544 expect(fn).toHaveBeenCalledWith(1);
545 });
546
547 it('emits synchronous values in order', () => {
548 const values = [];
549
550 sinks.forEach(x => values.push(x))(
551 operators.concat([
552 sources.fromArray([1, 2]),
553 sources.fromArray([3, 4])
554 ])
555 );
556
557 expect(values).toEqual([ 1, 2, 3, 4 ]);
558 });
559});
560
561describe('debounce', () => {
562 const noop = web.debounce(() => 0);
563 passesPassivePull(noop);
564 passesActivePush(noop);
565 passesSinkClose(noop);
566 passesSourceEnd(noop);
567 passesSingleStart(noop);
568 passesStrictEnd(noop);
569 passesAsyncSequence(noop);
570
571 it('waits for a specified amount of silence before emitting the last value', () => {
572 const { source, next } = sources.makeSubject<number>();
573 const fn = jest.fn();
574
575 sinks.forEach(fn)(web.debounce(() => 100)(source));
576
577 next(1);
578 jest.advanceTimersByTime(50);
579 expect(fn).not.toHaveBeenCalled();
580
581 next(2);
582 jest.advanceTimersByTime(99);
583 expect(fn).not.toHaveBeenCalled();
584
585 jest.advanceTimersByTime(1);
586 expect(fn).toHaveBeenCalledWith(2);
587 });
588
589 it('emits debounced value with delayed End signal', () => {
590 const { source, next, complete } = sources.makeSubject<number>();
591 const fn = jest.fn();
592
593 sinks.forEach(fn)(web.debounce(() => 100)(source));
594
595 next(1);
596 complete();
597 jest.advanceTimersByTime(100);
598 expect(fn).toHaveBeenCalled();
599 });
600});
601
602describe('delay', () => {
603 const noop = web.delay(0);
604 passesPassivePull(noop);
605 passesActivePush(noop);
606 passesSinkClose(noop);
607 passesSourceEnd(noop);
608 passesSingleStart(noop);
609 passesAsyncSequence(noop);
610
611 it('delays outputs by a specified delay timeout value', () => {
612 const { source, next } = sources.makeSubject();
613 const fn = jest.fn();
614
615 sinks.forEach(fn)(web.delay(100)(source));
616
617 next(1);
618 expect(fn).not.toHaveBeenCalled();
619
620 jest.advanceTimersByTime(100);
621 expect(fn).toHaveBeenCalledWith(1);
622 });
623});
624
625describe('filter', () => {
626 const noop = operators.filter(() => true);
627 passesPassivePull(noop);
628 passesActivePush(noop);
629 passesSinkClose(noop);
630 passesSourceEnd(noop);
631 passesSingleStart(noop);
632 passesAsyncSequence(noop);
633
634 it('prevents emissions for which a predicate fails', () => {
635 const { source, next } = sources.makeSubject();
636 const fn = jest.fn();
637
638 sinks.forEach(fn)(operators.filter(x => !!x)(source));
639
640 next(false);
641 expect(fn).not.toHaveBeenCalled();
642
643 next(true);
644 expect(fn).toHaveBeenCalledWith(true);
645 });
646});
647
648describe('map', () => {
649 const noop = operators.map(x => x);
650 passesPassivePull(noop);
651 passesActivePush(noop);
652 passesSinkClose(noop);
653 passesSourceEnd(noop);
654 passesSingleStart(noop);
655 passesAsyncSequence(noop);
656
657 it('maps over values given a transform function', () => {
658 const { source, next } = sources.makeSubject<number>();
659 const fn = jest.fn();
660
661 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
662
663 next(1);
664 expect(fn).toHaveBeenCalledWith(2);
665 });
666});
667
668describe('mergeMap', () => {
669 const noop = operators.mergeMap(x => sources.fromValue(x));
670 passesPassivePull(noop);
671 passesActivePush(noop);
672 passesSinkClose(noop);
673 passesSourcePushThenEnd(noop);
674 passesSingleStart(noop);
675 passesStrictEnd(noop);
676 passesAsyncSequence(noop);
677
678 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
679 it('emits values from each flattened synchronous source', () => {
680 const { source, next, complete } = sources.makeSubject<number>();
681 const fn = jest.fn();
682
683 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
684
685 next(1);
686 next(3);
687 complete();
688
689 expect(fn.mock.calls).toEqual([
690 [deriving.start(expect.any(Function))],
691 [deriving.push(1)],
692 [deriving.push(2)],
693 [deriving.push(3)],
694 [deriving.push(4)],
695 [deriving.end()],
696 ]);
697 });
698
699 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
700 it('lets inner sources finish when outer source ends', () => {
701 const values = [];
702 const teardown = jest.fn();
703 const fn = (signal: types.signalT<any>) => {
704 values.push(signal);
705 if (deriving.isStart(signal)) {
706 deriving.unboxStart(signal)(deriving.pull);
707 deriving.unboxStart(signal)(deriving.close);
708 }
709 };
710
711 operators.mergeMap(() => {
712 return sources.make(() => teardown);
713 })(sources.fromValue(null))(fn);
714
715 expect(teardown).toHaveBeenCalled();
716 expect(values).toEqual([
717 deriving.start(expect.any(Function)),
718 ]);
719 });
720
721 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
722 it('emits values from each flattened asynchronous source simultaneously', () => {
723 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
724 const fn = jest.fn();
725
726 sinks.forEach(fn)(
727 operators.mergeMap((x: number) => {
728 return web.delay(5)(sources.fromArray([x, x * 2]));
729 })(source)
730 );
731
732 jest.runAllTimers();
733 expect(fn.mock.calls).toEqual([
734 [1],
735 [10],
736 [2],
737 [20],
738 ]);
739 });
740
741 it('emits synchronous values in order', () => {
742 const values = [];
743
744 sinks.forEach(x => values.push(x))(
745 operators.merge([
746 sources.fromArray([1, 2]),
747 sources.fromArray([3, 4])
748 ])
749 );
750
751 expect(values).toEqual([ 1, 2, 3, 4 ]);
752 });
753});
754
755describe('onEnd', () => {
756 const noop = operators.onEnd(() => {});
757 passesPassivePull(noop);
758 passesActivePush(noop);
759 passesSinkClose(noop);
760 passesSourceEnd(noop);
761 passesStrictEnd(noop);
762 passesSingleStart(noop);
763 passesAsyncSequence(noop);
764
765 it('calls a callback when the source ends', () => {
766 const { source, next, complete } = sources.makeSubject<number>();
767 const fn = jest.fn();
768
769 sinks.forEach(() => {})(operators.onEnd(fn)(source));
770
771 next(null);
772 expect(fn).not.toHaveBeenCalled();
773
774 complete();
775 expect(fn).toHaveBeenCalled();
776 });
777});
778
779describe('onPush', () => {
780 const noop = operators.onPush(() => {});
781 passesPassivePull(noop);
782 passesActivePush(noop);
783 passesSinkClose(noop);
784 passesSourceEnd(noop);
785 passesStrictEnd(noop);
786 passesSingleStart(noop);
787 passesAsyncSequence(noop);
788
789 it('calls a callback when the source emits', () => {
790 const { source, next } = sources.makeSubject<number>();
791 const fn = jest.fn();
792
793 sinks.forEach(() => {})(operators.onPush(fn)(source));
794
795 next(1);
796 expect(fn).toHaveBeenCalledWith(1);
797 next(2);
798 expect(fn).toHaveBeenCalledWith(2);
799 });
800
801 it('is the same as `tap`', () => {
802 expect(operators.onPush).toBe(operators.tap);
803 });
804});
805
806describe('onStart', () => {
807 const noop = operators.onStart(() => {});
808 passesPassivePull(noop);
809 passesActivePush(noop);
810 passesSinkClose(noop);
811 passesSourceEnd(noop);
812 passesSingleStart(noop);
813 passesAsyncSequence(noop);
814
815 it('is called when the source starts', () => {
816 let sink: types.sinkT<any>;
817
818 const fn = jest.fn();
819 const source: types.sourceT<any> = _sink => { sink = _sink; };
820
821 sinks.forEach(() => {})(operators.onStart(fn)(source));
822
823 expect(fn).not.toHaveBeenCalled();
824
825 sink(deriving.start(() => {}));
826 expect(fn).toHaveBeenCalled();
827 });
828});
829
830describe('sample', () => {
831 const valueThenNever: types.sourceT<any> = sink =>
832 sink(deriving.start(tb => {
833 if (tb === deriving.pull)
834 sink(deriving.push(null));
835 }));
836
837 const noop = operators.sample(valueThenNever);
838
839 passesPassivePull(noop);
840 passesActivePush(noop);
841 passesSinkClose(noop);
842 passesSourcePushThenEnd(noop);
843 passesSingleStart(noop);
844 passesStrictEnd(noop);
845
846 it('emits the latest value when a notifier source emits', () => {
847 const { source: notifier$, next: notify } = sources.makeSubject();
848 const { source: input$, next } = sources.makeSubject();
849 const fn = jest.fn();
850
851 sinks.forEach(fn)(operators.sample(notifier$)(input$));
852
853 next(1);
854 next(2);
855 expect(fn).not.toHaveBeenCalled();
856
857 notify(null);
858 expect(fn).toHaveBeenCalledWith(2);
859 });
860});
861
862describe('scan', () => {
863 const noop = operators.scan((_acc, x) => x, null);
864 passesPassivePull(noop);
865 passesActivePush(noop);
866 passesSinkClose(noop);
867 passesSourceEnd(noop);
868 passesSingleStart(noop);
869 passesAsyncSequence(noop);
870
871 it('folds values continuously with a reducer and initial value', () => {
872 const { source: input$, next } = sources.makeSubject<number>();
873 const fn = jest.fn();
874
875 const reducer = (acc: number, x: number) => acc + x;
876 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
877
878 next(1);
879 expect(fn).toHaveBeenCalledWith(1);
880 next(2);
881 expect(fn).toHaveBeenCalledWith(3);
882 });
883});
884
885describe('share', () => {
886 const noop = operators.share;
887 passesPassivePull(noop);
888 passesActivePush(noop);
889 passesSinkClose(noop);
890 passesSourceEnd(noop);
891 passesSingleStart(noop);
892 passesStrictEnd(noop);
893 passesAsyncSequence(noop);
894
895 it('shares output values between sinks', () => {
896 let push = () => {};
897
898 const source: types.sourceT<any> = operators.share(sink => {
899 sink(deriving.start(() => {}));
900 push = () => {
901 sink(deriving.push([0]));
902 sink(deriving.end());
903 };
904 });
905
906 const fnA = jest.fn();
907 const fnB = jest.fn();
908
909 sinks.forEach(fnA)(source);
910 sinks.forEach(fnB)(source);
911 push();
912
913 expect(fnA).toHaveBeenCalledWith([0]);
914 expect(fnB).toHaveBeenCalledWith([0]);
915 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
916 });
917});
918
919describe('skip', () => {
920 const noop = operators.skip(0);
921 passesPassivePull(noop);
922 passesActivePush(noop);
923 passesSinkClose(noop);
924 passesSourceEnd(noop);
925 passesSingleStart(noop);
926 passesAsyncSequence(noop);
927
928 it('skips a number of values before emitting normally', () => {
929 const { source, next } = sources.makeSubject<number>();
930 const fn = jest.fn();
931
932 sinks.forEach(fn)(operators.skip(1)(source));
933
934 next(1);
935 expect(fn).not.toHaveBeenCalled();
936 next(2);
937 expect(fn).toHaveBeenCalledWith(2);
938 });
939});
940
941describe('skipUntil', () => {
942 const noop = operators.skipUntil(sources.fromValue(null));
943 passesPassivePull(noop);
944 passesActivePush(noop);
945 passesSinkClose(noop);
946 passesSourceEnd(noop);
947 passesSingleStart(noop);
948 passesAsyncSequence(noop);
949 passesStrictEnd(noop);
950
951 it('skips values until the notifier source emits', () => {
952 const { source: notifier$, next: notify } = sources.makeSubject();
953 const { source: input$, next } = sources.makeSubject<number>();
954 const fn = jest.fn();
955
956 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
957
958 next(1);
959 expect(fn).not.toHaveBeenCalled();
960 notify(null);
961 next(2);
962 expect(fn).toHaveBeenCalledWith(2);
963 });
964});
965
966describe('skipWhile', () => {
967 const noop = operators.skipWhile(() => false);
968 passesPassivePull(noop);
969 passesActivePush(noop);
970 passesSinkClose(noop);
971 passesSourceEnd(noop);
972 passesSingleStart(noop);
973 passesAsyncSequence(noop);
974
975 it('skips values until one fails a predicate', () => {
976 const { source, next } = sources.makeSubject<number>();
977 const fn = jest.fn();
978
979 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source));
980
981 next(1);
982 expect(fn).not.toHaveBeenCalled();
983 next(2);
984 expect(fn).toHaveBeenCalledWith(2);
985 });
986});
987
988describe('switchMap', () => {
989 const noop = operators.switchMap(x => sources.fromValue(x));
990 passesPassivePull(noop);
991 passesActivePush(noop);
992 passesSinkClose(noop);
993 passesSourcePushThenEnd(noop);
994 passesSingleStart(noop);
995 passesStrictEnd(noop);
996 passesAsyncSequence(noop);
997
998 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
999 it('emits values from each flattened synchronous source', () => {
1000 const { source, next, complete } = sources.makeSubject<number>();
1001 const fn = jest.fn();
1002
1003 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
1004
1005 next(1);
1006 next(3);
1007 complete();
1008
1009 expect(fn).toHaveBeenCalledTimes(6);
1010 expect(fn.mock.calls).toEqual([
1011 [deriving.start(expect.any(Function))],
1012 [deriving.push(1)],
1013 [deriving.push(2)],
1014 [deriving.push(3)],
1015 [deriving.push(4)],
1016 [deriving.end()],
1017 ]);
1018 });
1019
1020 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
1021 it('lets inner sources finish when outer source ends', () => {
1022 const values = [];
1023 const teardown = jest.fn();
1024 const fn = (signal: types.signalT<any>) => {
1025 values.push(signal);
1026 if (deriving.isStart(signal)) {
1027 deriving.unboxStart(signal)(deriving.pull);
1028 deriving.unboxStart(signal)(deriving.close);
1029 }
1030 };
1031
1032 operators.switchMap(() => {
1033 return sources.make(() => teardown);
1034 })(sources.fromValue(null))(fn);
1035
1036 expect(teardown).toHaveBeenCalled();
1037 expect(values).toEqual([
1038 deriving.start(expect.any(Function)),
1039 ]);
1040 });
1041
1042 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
1043 it('emits values from each flattened asynchronous source, one at a time', () => {
1044 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
1045 const fn = jest.fn();
1046
1047 sinks.forEach(fn)(
1048 operators.switchMap((x: number) => (
1049 operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5)))
1050 ))(source)
1051 );
1052
1053 jest.runAllTimers();
1054 expect(fn.mock.calls).toEqual([
1055 [1],
1056 [10],
1057 [20],
1058 ]);
1059 });
1060});
1061
1062describe('take', () => {
1063 const noop = operators.take(10);
1064 passesPassivePull(noop);
1065 passesActivePush(noop);
1066 passesSinkClose(noop);
1067 passesSourceEnd(noop);
1068 passesSingleStart(noop);
1069 passesStrictEnd(noop);
1070 passesAsyncSequence(noop);
1071
1072 passesCloseAndEnd(operators.take(0));
1073
1074 it('emits values until a maximum is reached', () => {
1075 const { source, next } = sources.makeSubject<number>();
1076 const fn = jest.fn();
1077
1078 operators.take(1)(source)(fn);
1079 next(1);
1080
1081 expect(fn).toHaveBeenCalledTimes(3);
1082 expect(fn.mock.calls).toEqual([
1083 [deriving.start(expect.any(Function))],
1084 [deriving.push(1)],
1085 [deriving.end()],
1086 ]);
1087 });
1088});
1089
1090describe('takeUntil', () => {
1091 const noop = operators.takeUntil(sources.never);
1092 passesPassivePull(noop);
1093 passesActivePush(noop);
1094 passesSinkClose(noop);
1095 passesSourcePushThenEnd(noop);
1096 passesSingleStart(noop);
1097 passesStrictEnd(noop);
1098 passesAsyncSequence(noop);
1099
1100 const ending = operators.takeUntil(sources.fromValue(null));
1101 passesCloseAndEnd(ending);
1102
1103 it('emits values until a notifier emits', () => {
1104 const { source: notifier$, next: notify } = sources.makeSubject<number>();
1105 const { source: input$, next } = sources.makeSubject<number>();
1106 const fn = jest.fn();
1107
1108 operators.takeUntil(notifier$)(input$)(fn);
1109 next(1);
1110
1111 expect(fn).toHaveBeenCalledTimes(2);
1112 expect(fn.mock.calls).toEqual([
1113 [deriving.start(expect.any(Function))],
1114 [deriving.push(1)],
1115 ]);
1116
1117 notify(null);
1118 expect(fn).toHaveBeenCalledTimes(3);
1119 expect(fn.mock.calls[2][0]).toEqual(deriving.end());
1120 });
1121});
1122
1123describe('takeWhile', () => {
1124 const noop = operators.takeWhile(() => true);
1125 passesPassivePull(noop);
1126 passesActivePush(noop);
1127 passesSinkClose(noop);
1128 passesSourceEnd(noop);
1129 passesSingleStart(noop);
1130 passesAsyncSequence(noop);
1131
1132 const ending = operators.takeWhile(() => false);
1133 passesCloseAndEnd(ending);
1134
1135 it('emits values while a predicate passes for all values', () => {
1136 const { source, next } = sources.makeSubject<number>();
1137 const fn = jest.fn();
1138
1139 operators.takeWhile(x => x < 2)(source)(fn);
1140 next(1);
1141 next(2);
1142
1143 expect(fn.mock.calls).toEqual([
1144 [deriving.start(expect.any(Function))],
1145 [deriving.push(1)],
1146 [deriving.end()],
1147 ]);
1148 });
1149});
1150
1151describe('takeLast', () => {
1152 passesCloseAndEnd(operators.takeLast(0));
1153
1154 it('emits the last max values of an ended source', () => {
1155 const { source, next, complete } = sources.makeSubject<number>();
1156 const values = [];
1157
1158 let talkback;
1159 operators.takeLast(1)(source)(signal => {
1160 values.push(signal);
1161 if (deriving.isStart(signal))
1162 talkback = deriving.unboxStart(signal);
1163 if (!deriving.isEnd(signal))
1164 talkback(deriving.pull);
1165 });
1166
1167 next(1);
1168 next(2);
1169
1170 expect(values.length).toBe(0);
1171 complete();
1172
1173 expect(values).toEqual([
1174 deriving.start(expect.any(Function)),
1175 deriving.push(2),
1176 deriving.end(),
1177 ]);
1178 });
1179});
1180
1181describe('throttle', () => {
1182 const noop = web.throttle(() => 0);
1183 passesPassivePull(noop);
1184 passesActivePush(noop);
1185 passesSinkClose(noop);
1186 passesSourceEnd(noop);
1187 passesSingleStart(noop);
1188 passesAsyncSequence(noop);
1189
1190 it('should ignore emissions for a period of time after a value', () => {
1191 const { source, next } = sources.makeSubject<number>();
1192 const fn = jest.fn();
1193
1194 sinks.forEach(fn)(web.throttle(() => 100)(source));
1195
1196 next(1);
1197 expect(fn).toHaveBeenCalledWith(1);
1198 jest.advanceTimersByTime(50);
1199
1200 next(2);
1201 expect(fn).toHaveBeenCalledTimes(1);
1202 jest.advanceTimersByTime(50);
1203
1204 next(3);
1205 expect(fn).toHaveBeenCalledWith(3);
1206 });
1207});