1import * as deriving from './helpers/wonka_deriving';
2import * as sources from './wonka_sources.gen';
3import * as sinks from './wonka_sinks.gen';
4import * as operators from './wonka_operators.gen';
5import * as web from './web/wonkaJs.gen';
6import * as types from './wonka_types.gen';
7
8/* This tests a noop operator for passive Pull talkback signals.
9 A Pull will be sent from the sink upwards and should pass through
10 the operator until the source receives it, which then pushes a
11 value down. */
12const passesPassivePull = (
13 operator: types.operatorT<any, any>,
14 output: any = 0
15) =>
16 it('responds to Pull talkback signals (spec)', () => {
17 let talkback = null;
18 let push = 0;
19 const values = [];
20
21 const source: types.sourceT<any> = sink => {
22 sink(deriving.start(tb => {
23 if (!push && tb === deriving.pull) {
24 push++;
25 sink(deriving.push(0));
26 }
27 }));
28 };
29
30 const sink: types.sinkT<any> = signal => {
31 expect(deriving.isEnd(signal)).toBeFalsy();
32 if (deriving.isPush(signal)) {
33 values.push(deriving.unboxPush(signal));
34 } else if (deriving.isStart(signal)) {
35 talkback = deriving.unboxStart(signal);
36 }
37 };
38
39 operator(source)(sink);
40 // The Start signal should always come in immediately
41 expect(talkback).not.toBe(null);
42 // No Push signals should be issued initially
43 expect(values).toEqual([]);
44
45 // When pulling a value we expect an immediate response
46 talkback(deriving.pull);
47 jest.runAllTimers();
48 expect(values).toEqual([output]);
49 });
50
51/* This tests a noop operator for regular, active Push signals.
52 A Push will be sent downwards from the source, through the
53 operator to the sink. Pull events should be let through from
54 the sink after every Push event. */
55const passesActivePush = (
56 operator: types.operatorT<any, any>,
57 result: any = 0
58) =>
59 it('responds to eager Push signals (spec)', () => {
60 const values = [];
61 let talkback = null;
62 let push = null;
63 let pulls = 0;
64
65 const source: types.sourceT<any> = sink => {
66 push = (value: any) => sink(deriving.push(value));
67 sink(deriving.start(tb => {
68 if (tb === deriving.pull)
69 pulls++;
70 }));
71 };
72
73 const sink: types.sinkT<any> = signal => {
74 expect(deriving.isEnd(signal)).toBeFalsy();
75 if (deriving.isStart(signal)) {
76 talkback = deriving.unboxStart(signal);
77 } else if (deriving.isPush(signal)) {
78 values.push(deriving.unboxPush(signal));
79 talkback(deriving.pull);
80 }
81 };
82
83 operator(source)(sink);
84 // No Pull signals should be issued initially
85 expect(pulls).toBe(0);
86
87 // When pushing a value we expect an immediate response
88 push(0);
89 jest.runAllTimers();
90 expect(values).toEqual([result]);
91 // Subsequently the Pull signal should have travelled upwards
92 expect(pulls).toBe(1);
93 });
94
95/* This tests a noop operator for Close talkback signals from the sink.
96 A Close signal will be sent, which should be forwarded to the source,
97 which then ends the communication without sending an End signal. */
98const passesSinkClose = (operator: types.operatorT<any, any>) =>
99 it('responds to Close signals from sink (spec)', () => {
100 let talkback = null;
101 let closing = 0;
102 let pulls = 0;
103
104 const source: types.sourceT<any> = sink => {
105 sink(deriving.start(tb => {
106 if (tb === deriving.pull) {
107 pulls++;
108 if (!closing) sink(deriving.push(0));
109 } if (tb === deriving.close) {
110 closing++;
111 }
112 }));
113 };
114
115 const sink: types.sinkT<any> = signal => {
116 expect(deriving.isEnd(signal)).toBeFalsy();
117 if (deriving.isStart(signal)) {
118 talkback = deriving.unboxStart(signal);
119 } else if (deriving.isPush(signal)) {
120 talkback(deriving.close);
121 }
122 };
123
124 operator(source)(sink);
125
126 // When pushing a value we expect an immediate close signal
127 talkback(deriving.pull);
128 jest.runAllTimers();
129 expect(closing).toBe(1);
130 expect(pulls).toBe(1);
131 });
132
133/* This tests a noop operator for End signals from the source.
134 A Push and End signal will be sent after the first Pull talkback
135 signal from the sink, which shouldn't lead to any extra Close or Pull
136 talkback signals. */
137const passesSourceEnd = (
138 operator: types.operatorT<any, any>,
139 result: any = 0
140) =>
141 it('passes on End signals from source (spec)', () => {
142 const signals = [];
143 let talkback = null;
144 let pulls = 0;
145 let ending = 0;
146
147 const source: types.sourceT<any> = sink => {
148 sink(deriving.start(tb => {
149 expect(tb).not.toBe(deriving.close);
150 if (tb === deriving.pull) pulls++;
151 if (pulls === 1) {
152 sink(deriving.push(0));
153 sink(deriving.end());
154 }
155 }));
156 };
157
158 const sink: types.sinkT<any> = signal => {
159 if (deriving.isStart(signal)) {
160 talkback = deriving.unboxStart(signal);
161 } else {
162 signals.push(signal);
163 if (deriving.isEnd(signal)) ending++;
164 }
165 };
166
167 operator(source)(sink);
168
169 // When pushing a value we expect an immediate Push then End signal
170 talkback(deriving.pull);
171 jest.runAllTimers();
172 expect(pulls).toBe(1);
173 expect(ending).toBe(1);
174 expect(signals).toEqual([deriving.push(result), deriving.end()]);
175 });
176
177/* This tests a noop operator for Start signals from the source.
178 When the operator's sink is started by the source it'll receive
179 a Start event. As a response it should never send more than one
180 Start signals to the sink. */
181const passesSingleStart = (operator: types.operatorT<any, any>) =>
182 it('sends a single Start event to the incoming sink (spec)', () => {
183 let start = 0;
184
185 const source: types.sourceT<any> = sink => {
186 sink(deriving.start(() => {}));
187 };
188
189 const sink: types.sinkT<any> = signal => {
190 if (deriving.isStart(signal)) start++;
191 };
192
193 // When starting the operator we expect a single start event on the sink
194 operator(source)(sink);
195 expect(start).toBe(1);
196 });
197
198/* This tests a noop operator for silence after End signals from the source.
199 When the operator receives the End signal it shouldn't forward any other
200 signals to the sink anymore.
201 This isn't a strict requirement, but some operators should ensure that
202 all sources are well behaved. This is particularly true for operators
203 that either Close sources themselves or may operate on multiple sources. */
204const passesStrictEnd = (operator: types.operatorT<any, any>) => {
205 it('stops all signals after End has been received (spec: strict end)', () => {
206 let pulls = 0;
207 const signals = [];
208
209 const source: types.sourceT<any> = sink => {
210 sink(deriving.start(tb => {
211 if (tb === deriving.pull) {
212 pulls++;
213 sink(deriving.end());
214 sink(deriving.push(123));
215 }
216 }));
217 };
218
219 const sink: types.sinkT<any> = signal => {
220 if (deriving.isStart(signal)) {
221 deriving.unboxStart(signal)(deriving.pull);
222 } else {
223 signals.push(signal);
224 }
225 };
226
227 operator(source)(sink);
228
229 // The Push signal should've been dropped
230 jest.runAllTimers();
231 expect(signals).toEqual([deriving.end()]);
232 expect(pulls).toBe(1);
233 });
234
235 it('stops all signals after Close has been received (spec: strict close)', () => {
236 const signals = [];
237
238 const source: types.sourceT<any> = sink => {
239 sink(deriving.start(tb => {
240 if (tb === deriving.close) {
241 sink(deriving.push(123));
242 }
243 }));
244 };
245
246 const sink: types.sinkT<any> = signal => {
247 if (deriving.isStart(signal)) {
248 deriving.unboxStart(signal)(deriving.close);
249 } else {
250 signals.push(signal);
251 }
252 };
253
254 operator(source)(sink);
255
256 // The Push signal should've been dropped
257 jest.runAllTimers();
258 expect(signals).toEqual([]);
259 });
260};
261
262/* This tests an immediately closing operator for End signals to
263 the sink and Close signals to the source.
264 When an operator closes immediately we expect to see a Close
265 signal at the source and an End signal to the sink, since the
266 closing operator is expected to end the entire chain. */
267const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) =>
268 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
269 let closing = 0;
270 let ending = 0;
271
272 const source: types.sourceT<any> = sink => {
273 sink(deriving.start(tb => {
274 // For some operator tests we do need to send a single value
275 if (tb === deriving.pull)
276 sink(deriving.push(null));
277 if (tb === deriving.close)
278 closing++;
279 }));
280 };
281
282 const sink: types.sinkT<any> = signal => {
283 if (deriving.isStart(signal)) {
284 deriving.unboxStart(signal)(deriving.pull);
285 } if (deriving.isEnd(signal)) {
286 ending++;
287 }
288 };
289
290 // We expect the operator to immediately end and close
291 closingOperator(source)(sink);
292 expect(closing).toBe(1);
293 expect(ending).toBe(1);
294 });
295
296const passesAsyncSequence = (
297 operator: types.operatorT<any, any>,
298 result: any = 0
299) =>
300 it('passes an async push with an async end (spec)', () => {
301 let hasPushed = false;
302 const signals = [];
303
304 const source: types.sourceT<any> = sink => {
305 sink(deriving.start(tb => {
306 if (tb === deriving.pull && !hasPushed) {
307 hasPushed = true;
308 setTimeout(() => sink(deriving.push(0)), 10);
309 setTimeout(() => sink(deriving.end()), 20);
310 }
311 }));
312 };
313
314 const sink: types.sinkT<any> = signal => {
315 if (deriving.isStart(signal)) {
316 setTimeout(() => {
317 deriving.unboxStart(signal)(deriving.pull);
318 }, 5);
319 } else {
320 signals.push(signal);
321 }
322 };
323
324 // We initially expect to see the push signal
325 // Afterwards after all timers all other signals come in
326 operator(source)(sink);
327 expect(signals.length).toBe(0);
328 jest.advanceTimersByTime(5);
329 expect(hasPushed).toBeTruthy();
330 jest.runAllTimers();
331
332 expect(signals).toEqual([
333 deriving.push(result),
334 deriving.end()
335 ]);
336 });
337
338beforeEach(() => {
339 jest.useFakeTimers();
340});
341
342describe('combine', () => {
343 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source);
344
345 passesPassivePull(noop, [0, 0]);
346 passesActivePush(noop, [0, 0]);
347 passesSinkClose(noop);
348 passesSourceEnd(noop, [0, 0]);
349 passesSingleStart(noop);
350 passesStrictEnd(noop);
351
352 it('emits the zipped values of two sources', () => {
353 const { source: sourceA, next: nextA } = sources.makeSubject();
354 const { source: sourceB, next: nextB } = sources.makeSubject();
355 const fn = jest.fn();
356
357 sinks.forEach(fn)(operators.combine(sourceA, sourceB));
358
359 nextA(1);
360 expect(fn).not.toHaveBeenCalled();
361 nextB(2);
362 expect(fn).toHaveBeenCalledWith([1, 2]);
363 });
364});
365
366describe('buffer', () => {
367 const valueThenNever: types.sourceT<any> = sink =>
368 sink(deriving.start(tb => {
369 if (tb === deriving.pull)
370 sink(deriving.push(null));
371 }));
372
373 const noop = operators.buffer(valueThenNever);
374
375 passesPassivePull(noop, [0]);
376 passesActivePush(noop, [0]);
377 passesSinkClose(noop);
378 passesSourceEnd(noop, [0]);
379 passesSingleStart(noop);
380 passesStrictEnd(noop);
381
382 it('emits batches of input values when a notifier emits', () => {
383 const { source: notifier$, next: notify } = sources.makeSubject();
384 const { source: input$, next } = sources.makeSubject();
385 const fn = jest.fn();
386
387 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
388
389 next(1);
390 next(2);
391 expect(fn).not.toHaveBeenCalled();
392
393 notify(null);
394 expect(fn).toHaveBeenCalledWith([1, 2]);
395
396 next(3);
397 notify(null);
398 expect(fn).toHaveBeenCalledWith([3]);
399 });
400});
401
402describe('concatMap', () => {
403 const noop = operators.concatMap(x => sources.fromValue(x));
404 passesPassivePull(noop);
405 passesActivePush(noop);
406 passesSinkClose(noop);
407 passesSourceEnd(noop);
408 passesSingleStart(noop);
409 passesStrictEnd(noop);
410 passesAsyncSequence(noop);
411
412 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
413 it('emits values from each flattened synchronous source', () => {
414 const { source, next, complete } = sources.makeSubject<number>();
415 const fn = jest.fn();
416
417 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
418
419 next(1);
420 next(3);
421 complete();
422
423 expect(fn).toHaveBeenCalledTimes(6);
424 expect(fn.mock.calls).toEqual([
425 [deriving.start(expect.any(Function))],
426 [deriving.push(1)],
427 [deriving.push(2)],
428 [deriving.push(3)],
429 [deriving.push(4)],
430 [deriving.end()],
431 ]);
432 });
433
434 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
435 it('emits values from each flattened asynchronous source, one at a time', () => {
436 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
437 const fn = jest.fn();
438
439 sinks.forEach(fn)(
440 operators.concatMap((x: number) => {
441 return web.delay(5)(sources.fromArray([x, x * 2]));
442 })(source)
443 );
444
445 jest.advanceTimersByTime(14);
446 expect(fn.mock.calls).toEqual([
447 [1],
448 [2],
449 ]);
450
451 jest.runAllTimers();
452 expect(fn.mock.calls).toEqual([
453 [1],
454 [2],
455 [10],
456 [20],
457 ]);
458 });
459
460 it('emits synchronous values in order', () => {
461 const values = [];
462
463 sinks.forEach(x => values.push(x))(
464 operators.concat([
465 sources.fromArray([1, 2]),
466 sources.fromArray([3, 4])
467 ])
468 );
469
470 expect(values).toEqual([ 1, 2, 3, 4 ]);
471 });
472});
473
474describe('debounce', () => {
475 const noop = web.debounce(() => 0);
476 passesPassivePull(noop);
477 passesActivePush(noop);
478 passesSinkClose(noop);
479 passesSourceEnd(noop);
480 passesSingleStart(noop);
481 passesStrictEnd(noop);
482 passesAsyncSequence(noop);
483
484 it('waits for a specified amount of silence before emitting the last value', () => {
485 const { source, next } = sources.makeSubject<number>();
486 const fn = jest.fn();
487
488 sinks.forEach(fn)(web.debounce(() => 100)(source));
489
490 next(1);
491 jest.advanceTimersByTime(50);
492 expect(fn).not.toHaveBeenCalled();
493
494 next(2);
495 jest.advanceTimersByTime(99);
496 expect(fn).not.toHaveBeenCalled();
497
498 jest.advanceTimersByTime(1);
499 expect(fn).toHaveBeenCalledWith(2);
500 });
501
502 it('emits debounced value with delayed End signal', () => {
503 const { source, next, complete } = sources.makeSubject<number>();
504 const fn = jest.fn();
505
506 sinks.forEach(fn)(web.debounce(() => 100)(source));
507
508 next(1);
509 complete();
510 jest.advanceTimersByTime(100);
511 expect(fn).toHaveBeenCalled();
512 });
513});
514
515describe('delay', () => {
516 const noop = web.delay(0);
517 passesPassivePull(noop);
518 passesActivePush(noop);
519 passesSinkClose(noop);
520 passesSourceEnd(noop);
521 passesSingleStart(noop);
522 passesAsyncSequence(noop);
523
524 it('delays outputs by a specified delay timeout value', () => {
525 const { source, next } = sources.makeSubject();
526 const fn = jest.fn();
527
528 sinks.forEach(fn)(web.delay(100)(source));
529
530 next(1);
531 expect(fn).not.toHaveBeenCalled();
532
533 jest.advanceTimersByTime(100);
534 expect(fn).toHaveBeenCalledWith(1);
535 });
536});
537
538describe('filter', () => {
539 const noop = operators.filter(() => true);
540 passesPassivePull(noop);
541 passesActivePush(noop);
542 passesSinkClose(noop);
543 passesSourceEnd(noop);
544 passesSingleStart(noop);
545 passesAsyncSequence(noop);
546
547 it('prevents emissions for which a predicate fails', () => {
548 const { source, next } = sources.makeSubject();
549 const fn = jest.fn();
550
551 sinks.forEach(fn)(operators.filter(x => !!x)(source));
552
553 next(false);
554 expect(fn).not.toHaveBeenCalled();
555
556 next(true);
557 expect(fn).toHaveBeenCalledWith(true);
558 });
559});
560
561describe('map', () => {
562 const noop = operators.map(x => x);
563 passesPassivePull(noop);
564 passesActivePush(noop);
565 passesSinkClose(noop);
566 passesSourceEnd(noop);
567 passesSingleStart(noop);
568 passesAsyncSequence(noop);
569
570 it('maps over values given a transform function', () => {
571 const { source, next } = sources.makeSubject<number>();
572 const fn = jest.fn();
573
574 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
575
576 next(1);
577 expect(fn).toHaveBeenCalledWith(2);
578 });
579});
580
581describe('mergeMap', () => {
582 const noop = operators.mergeMap(x => sources.fromValue(x));
583 passesPassivePull(noop);
584 passesActivePush(noop);
585 passesSinkClose(noop);
586 passesSourceEnd(noop);
587 passesSingleStart(noop);
588 passesStrictEnd(noop);
589 passesAsyncSequence(noop);
590
591 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
592 it('emits values from each flattened synchronous source', () => {
593 const { source, next, complete } = sources.makeSubject<number>();
594 const fn = jest.fn();
595
596 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
597
598 next(1);
599 next(3);
600 complete();
601
602 expect(fn).toHaveBeenCalledTimes(6);
603 expect(fn.mock.calls).toEqual([
604 [deriving.start(expect.any(Function))],
605 [deriving.push(1)],
606 [deriving.push(2)],
607 [deriving.push(3)],
608 [deriving.push(4)],
609 [deriving.end()],
610 ]);
611 });
612
613 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
614 it('emits values from each flattened asynchronous source simultaneously', () => {
615 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
616 const fn = jest.fn();
617
618 sinks.forEach(fn)(
619 operators.mergeMap((x: number) => {
620 return web.delay(5)(sources.fromArray([x, x * 2]));
621 })(source)
622 );
623
624 jest.runAllTimers();
625 expect(fn.mock.calls).toEqual([
626 [1],
627 [2],
628 [10],
629 [20],
630 ]);
631 });
632
633 it('emits synchronous values in order', () => {
634 const values = [];
635
636 sinks.forEach(x => values.push(x))(
637 operators.merge([
638 sources.fromArray([1, 2]),
639 sources.fromArray([3, 4])
640 ])
641 );
642
643 expect(values).toEqual([ 1, 2, 3, 4 ]);
644 });
645});
646
647describe('onEnd', () => {
648 const noop = operators.onEnd(() => {});
649 passesPassivePull(noop);
650 passesActivePush(noop);
651 passesSinkClose(noop);
652 passesSourceEnd(noop);
653 passesSingleStart(noop);
654 passesAsyncSequence(noop);
655
656 it('calls a callback when the source ends', () => {
657 const { source, next, complete } = sources.makeSubject<number>();
658 const fn = jest.fn();
659
660 sinks.forEach(() => {})(operators.onEnd(fn)(source));
661
662 next(null);
663 expect(fn).not.toHaveBeenCalled();
664
665 complete();
666 expect(fn).toHaveBeenCalled();
667 });
668});
669
670describe('onPush', () => {
671 const noop = operators.onPush(() => {});
672 passesPassivePull(noop);
673 passesActivePush(noop);
674 passesSinkClose(noop);
675 passesSourceEnd(noop);
676 passesSingleStart(noop);
677 passesAsyncSequence(noop);
678
679 it('calls a callback when the source emits', () => {
680 const { source, next } = sources.makeSubject<number>();
681 const fn = jest.fn();
682
683 sinks.forEach(() => {})(operators.onPush(fn)(source));
684
685 next(1);
686 expect(fn).toHaveBeenCalledWith(1);
687 next(2);
688 expect(fn).toHaveBeenCalledWith(2);
689 });
690
691 it('is the same as `tap`', () => {
692 expect(operators.onPush).toBe(operators.tap);
693 });
694});
695
696describe('onStart', () => {
697 const noop = operators.onStart(() => {});
698 passesPassivePull(noop);
699 passesActivePush(noop);
700 passesSinkClose(noop);
701 passesSourceEnd(noop);
702 passesSingleStart(noop);
703 passesAsyncSequence(noop);
704
705 it('is called when the source starts', () => {
706 let sink: types.sinkT<any>;
707
708 const fn = jest.fn();
709 const source: types.sourceT<any> = _sink => { sink = _sink; };
710
711 sinks.forEach(() => {})(operators.onStart(fn)(source));
712
713 expect(fn).not.toHaveBeenCalled();
714
715 sink(deriving.start(() => {}));
716 expect(fn).toHaveBeenCalled();
717 });
718});
719
720describe('sample', () => {
721 const valueThenNever: types.sourceT<any> = sink =>
722 sink(deriving.start(tb => {
723 if (tb === deriving.pull)
724 sink(deriving.push(null));
725 }));
726
727 const noop = operators.sample(valueThenNever);
728
729 passesPassivePull(noop);
730 passesActivePush(noop);
731 passesSinkClose(noop);
732 passesSourceEnd(noop);
733 passesSingleStart(noop);
734 passesStrictEnd(noop);
735
736 it('emits the latest value when a notifier source emits', () => {
737 const { source: notifier$, next: notify } = sources.makeSubject();
738 const { source: input$, next } = sources.makeSubject();
739 const fn = jest.fn();
740
741 sinks.forEach(fn)(operators.sample(notifier$)(input$));
742
743 next(1);
744 next(2);
745 expect(fn).not.toHaveBeenCalled();
746
747 notify(null);
748 expect(fn).toHaveBeenCalledWith(2);
749 });
750});
751
752describe('scan', () => {
753 const noop = operators.scan((_acc, x) => x, null);
754 passesPassivePull(noop);
755 passesActivePush(noop);
756 passesSinkClose(noop);
757 passesSourceEnd(noop);
758 passesSingleStart(noop);
759 passesAsyncSequence(noop);
760
761 it('folds values continuously with a reducer and initial value', () => {
762 const { source: input$, next } = sources.makeSubject<number>();
763 const fn = jest.fn();
764
765 const reducer = (acc: number, x: number) => acc + x;
766 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
767
768 next(1);
769 expect(fn).toHaveBeenCalledWith(1);
770 next(2);
771 expect(fn).toHaveBeenCalledWith(3);
772 });
773});
774
775describe('share', () => {
776 const noop = operators.share;
777 passesPassivePull(noop);
778 passesActivePush(noop);
779 passesSinkClose(noop);
780 passesSourceEnd(noop);
781 passesSingleStart(noop);
782 passesStrictEnd(noop);
783 passesAsyncSequence(noop);
784
785 it('shares output values between sinks', () => {
786 let push = () => {};
787
788 const source: types.sourceT<any> = operators.share(sink => {
789 sink(deriving.start(() => {}));
790 push = () => {
791 sink(deriving.push([0]));
792 sink(deriving.end());
793 };
794 });
795
796 const fnA = jest.fn();
797 const fnB = jest.fn();
798
799 sinks.forEach(fnA)(source);
800 sinks.forEach(fnB)(source);
801 push();
802
803 expect(fnA).toHaveBeenCalledWith([0]);
804 expect(fnB).toHaveBeenCalledWith([0]);
805 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
806 });
807});
808
809describe('skip', () => {
810 const noop = operators.skip(0);
811 passesPassivePull(noop);
812 passesActivePush(noop);
813 passesSinkClose(noop);
814 passesSourceEnd(noop);
815 passesSingleStart(noop);
816 passesAsyncSequence(noop);
817
818 it('skips a number of values before emitting normally', () => {
819 const { source, next } = sources.makeSubject<number>();
820 const fn = jest.fn();
821
822 sinks.forEach(fn)(operators.skip(1)(source));
823
824 next(1);
825 expect(fn).not.toHaveBeenCalled();
826 next(2);
827 expect(fn).toHaveBeenCalledWith(2);
828 });
829});
830
831describe('skipUntil', () => {
832 const noop = operators.skipUntil(sources.fromValue(null));
833 passesPassivePull(noop);
834 passesActivePush(noop);
835 passesSinkClose(noop);
836 passesSourceEnd(noop);
837 passesSingleStart(noop);
838 passesAsyncSequence(noop);
839 passesStrictEnd(noop);
840
841 it('skips values until the notifier source emits', () => {
842 const { source: notifier$, next: notify } = sources.makeSubject();
843 const { source: input$, next } = sources.makeSubject<number>();
844 const fn = jest.fn();
845
846 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
847
848 next(1);
849 expect(fn).not.toHaveBeenCalled();
850 notify(null);
851 next(2);
852 expect(fn).toHaveBeenCalledWith(2);
853 });
854});
855
856describe('skipWhile', () => {
857 const noop = operators.skipWhile(() => false);
858 passesPassivePull(noop);
859 passesActivePush(noop);
860 passesSinkClose(noop);
861 passesSourceEnd(noop);
862 passesSingleStart(noop);
863 passesAsyncSequence(noop);
864
865 it('skips values until one fails a predicate', () => {
866 const { source, next } = sources.makeSubject<number>();
867 const fn = jest.fn();
868
869 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source));
870
871 next(1);
872 expect(fn).not.toHaveBeenCalled();
873 next(2);
874 expect(fn).toHaveBeenCalledWith(2);
875 });
876});
877
878describe('switchMap', () => {
879 const noop = operators.switchMap(x => sources.fromValue(x));
880 passesPassivePull(noop);
881 passesActivePush(noop);
882 passesSinkClose(noop);
883 passesSourceEnd(noop);
884 passesSingleStart(noop);
885 passesStrictEnd(noop);
886 passesAsyncSequence(noop);
887
888 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
889 it('emits values from each flattened synchronous source', () => {
890 const { source, next, complete } = sources.makeSubject<number>();
891 const fn = jest.fn();
892
893 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
894
895 next(1);
896 next(3);
897 complete();
898
899 expect(fn).toHaveBeenCalledTimes(6);
900 expect(fn.mock.calls).toEqual([
901 [deriving.start(expect.any(Function))],
902 [deriving.push(1)],
903 [deriving.push(2)],
904 [deriving.push(3)],
905 [deriving.push(4)],
906 [deriving.end()],
907 ]);
908 });
909
910 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
911 it('emits values from each flattened asynchronous source, one at a time', () => {
912 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
913 const fn = jest.fn();
914
915 sinks.forEach(fn)(
916 operators.switchMap((x: number) => (
917 operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5)))
918 ))(source)
919 );
920
921 jest.runAllTimers();
922 expect(fn.mock.calls).toEqual([
923 [1],
924 [10],
925 [20],
926 ]);
927 });
928});
929
930describe('take', () => {
931 const noop = operators.take(10);
932 passesPassivePull(noop);
933 passesActivePush(noop);
934 passesSinkClose(noop);
935 passesSourceEnd(noop);
936 passesSingleStart(noop);
937 passesStrictEnd(noop);
938 passesAsyncSequence(noop);
939
940 passesCloseAndEnd(operators.take(0));
941
942 it('emits values until a maximum is reached', () => {
943 const { source, next } = sources.makeSubject<number>();
944 const fn = jest.fn();
945
946 operators.take(1)(source)(fn);
947 next(1);
948
949 expect(fn).toHaveBeenCalledTimes(3);
950 expect(fn.mock.calls).toEqual([
951 [deriving.start(expect.any(Function))],
952 [deriving.push(1)],
953 [deriving.end()],
954 ]);
955 });
956});
957
958describe('takeUntil', () => {
959 const noop = operators.takeUntil(sources.never);
960 passesPassivePull(noop);
961 passesActivePush(noop);
962 passesSinkClose(noop);
963 passesSourceEnd(noop);
964 passesSingleStart(noop);
965 passesStrictEnd(noop);
966 passesAsyncSequence(noop);
967
968 const ending = operators.takeUntil(sources.fromValue(null));
969 passesCloseAndEnd(ending);
970
971 it('emits values until a notifier emits', () => {
972 const { source: notifier$, next: notify } = sources.makeSubject<number>();
973 const { source: input$, next } = sources.makeSubject<number>();
974 const fn = jest.fn();
975
976 operators.takeUntil(notifier$)(input$)(fn);
977 next(1);
978
979 expect(fn).toHaveBeenCalledTimes(2);
980 expect(fn.mock.calls).toEqual([
981 [deriving.start(expect.any(Function))],
982 [deriving.push(1)],
983 ]);
984
985 notify(null);
986 expect(fn).toHaveBeenCalledTimes(3);
987 expect(fn.mock.calls[2][0]).toEqual(deriving.end());
988 });
989});
990
991describe('takeWhile', () => {
992 const noop = operators.takeWhile(() => true);
993 passesPassivePull(noop);
994 passesActivePush(noop);
995 passesSinkClose(noop);
996 passesSourceEnd(noop);
997 passesSingleStart(noop);
998 passesAsyncSequence(noop);
999
1000 const ending = operators.takeWhile(() => false);
1001 passesCloseAndEnd(ending);
1002
1003 it('emits values while a predicate passes for all values', () => {
1004 const { source, next } = sources.makeSubject<number>();
1005 const fn = jest.fn();
1006
1007 operators.takeWhile(x => x < 2)(source)(fn);
1008 next(1);
1009 next(2);
1010
1011 expect(fn.mock.calls).toEqual([
1012 [deriving.start(expect.any(Function))],
1013 [deriving.push(1)],
1014 [deriving.end()],
1015 ]);
1016 });
1017});
1018
1019describe('takeLast', () => {
1020 passesCloseAndEnd(operators.takeLast(0));
1021
1022 it('emits the last max values of an ended source', () => {
1023 const { source, next, complete } = sources.makeSubject<number>();
1024 const values = [];
1025
1026 let talkback;
1027 operators.takeLast(1)(source)(signal => {
1028 values.push(signal);
1029 if (deriving.isStart(signal))
1030 talkback = deriving.unboxStart(signal);
1031 if (!deriving.isEnd(signal))
1032 talkback(deriving.pull);
1033 });
1034
1035 next(1);
1036 next(2);
1037
1038 expect(values.length).toBe(0);
1039 complete();
1040
1041 expect(values).toEqual([
1042 deriving.start(expect.any(Function)),
1043 deriving.push(2),
1044 deriving.end(),
1045 ]);
1046 });
1047});
1048
1049describe('throttle', () => {
1050 const noop = web.throttle(() => 0);
1051 passesPassivePull(noop);
1052 passesActivePush(noop);
1053 passesSinkClose(noop);
1054 passesSourceEnd(noop);
1055 passesSingleStart(noop);
1056 passesAsyncSequence(noop);
1057
1058 it('should ignore emissions for a period of time after a value', () => {
1059 const { source, next } = sources.makeSubject<number>();
1060 const fn = jest.fn();
1061
1062 sinks.forEach(fn)(web.throttle(() => 100)(source));
1063
1064 next(1);
1065 expect(fn).toHaveBeenCalledWith(1);
1066 jest.advanceTimersByTime(50);
1067
1068 next(2);
1069 expect(fn).toHaveBeenCalledTimes(1);
1070 jest.advanceTimersByTime(50);
1071
1072 next(3);
1073 expect(fn).toHaveBeenCalledWith(3);
1074 });
1075});