1import * as deriving from './helpers/wonka_deriving';
2import * as sources from './wonka_sources.gen';
3import * as sinks from './wonka_sinks.gen';
4import * as operators from './wonka_operators.gen';
5import * as web from './web/wonkaJs.gen';
6import * as types from './wonka_types.gen';
7
8/* This tests a noop operator for passive Pull talkback signals.
9 A Pull will be sent from the sink upwards and should pass through
10 the operator until the source receives it, which then pushes a
11 value down. */
12const passesPassivePull = (
13 operator: types.operatorT<any, any>,
14 output: any = 0
15) =>
16 it('responds to Pull talkback signals (spec)', () => {
17 let talkback = null;
18 const values = [];
19
20 const source: types.sourceT<any> = sink => {
21 sink(deriving.start(tb => {
22 if (tb === deriving.pull)
23 sink(deriving.push(0));
24 }));
25 };
26
27 const sink: types.sinkT<any> = signal => {
28 expect(deriving.isEnd(signal)).toBeFalsy();
29 if (deriving.isPush(signal)) {
30 values.push(deriving.unboxPush(signal));
31 } else if (deriving.isStart(signal)) {
32 talkback = deriving.unboxStart(signal);
33 }
34 };
35
36 operator(source)(sink);
37 // The Start signal should always come in immediately
38 expect(talkback).not.toBe(null);
39 // No Push signals should be issued initially
40 expect(values).toEqual([]);
41
42 // When pulling a value we expect an immediate response
43 talkback(deriving.pull);
44 jest.runAllTimers();
45 expect(values).toEqual([output]);
46 });
47
48/* This tests a noop operator for regular, active Push signals.
49 A Push will be sent downwards from the source, through the
50 operator to the sink. Pull events should be let through from
51 the sink after every Push event. */
52const passesActivePush = (
53 operator: types.operatorT<any, any>,
54 result: any = 0
55) =>
56 it('responds to eager Push signals (spec)', () => {
57 const values = [];
58 let talkback = null;
59 let push = null;
60 let pulls = 0;
61
62 const source: types.sourceT<any> = sink => {
63 push = (value: any) => sink(deriving.push(value));
64 sink(deriving.start(tb => {
65 if (tb === deriving.pull)
66 pulls++;
67 }));
68 };
69
70 const sink: types.sinkT<any> = signal => {
71 expect(deriving.isEnd(signal)).toBeFalsy();
72 if (deriving.isStart(signal)) {
73 talkback = deriving.unboxStart(signal);
74 } else if (deriving.isPush(signal)) {
75 values.push(deriving.unboxPush(signal));
76 talkback(deriving.pull);
77 }
78 };
79
80 operator(source)(sink);
81 // No Pull signals should be issued initially
82 expect(pulls).toBe(0);
83
84 // When pushing a value we expect an immediate response
85 push(0);
86 jest.runAllTimers();
87 expect(values).toEqual([result]);
88 // Subsequently the Pull signal should have travelled upwards
89 expect(pulls).toBe(1);
90 });
91
92/* This tests a noop operator for Close talkback signals from the sink.
93 A Close signal will be sent, which should be forwarded to the source,
94 which then ends the communication without sending an End signal. */
95const passesSinkClose = (operator: types.operatorT<any, any>) =>
96 it('responds to Close signals from sink (spec)', () => {
97 let talkback = null;
98 let closing = 0;
99 let pulls = 0;
100
101 const source: types.sourceT<any> = sink => {
102 sink(deriving.start(tb => {
103 if (tb === deriving.pull) {
104 pulls++;
105 if (!closing) sink(deriving.push(0));
106 } if (tb === deriving.close) {
107 closing++;
108 }
109 }));
110 };
111
112 const sink: types.sinkT<any> = signal => {
113 expect(deriving.isEnd(signal)).toBeFalsy();
114 if (deriving.isStart(signal)) {
115 talkback = deriving.unboxStart(signal);
116 } else if (deriving.isPush(signal)) {
117 talkback(deriving.close);
118 }
119 };
120
121 operator(source)(sink);
122
123 // When pushing a value we expect an immediate close signal
124 talkback(deriving.pull);
125 jest.runAllTimers();
126 expect(closing).toBe(1);
127 expect(pulls).toBe(1);
128 });
129
130/* This tests a noop operator for End signals from the source.
131 A Push and End signal will be sent after the first Pull talkback
132 signal from the sink, which shouldn't lead to any extra Close or Pull
133 talkback signals. */
134const passesSourceEnd = (
135 operator: types.operatorT<any, any>,
136 result: any = 0
137) =>
138 it('passes on End signals from source (spec)', () => {
139 const signals = [];
140 let talkback = null;
141 let pulls = 0;
142 let ending = 0;
143
144 const source: types.sourceT<any> = sink => {
145 sink(deriving.start(tb => {
146 expect(tb).not.toBe(deriving.close);
147 if (tb === deriving.pull) pulls++;
148 if (pulls === 1) {
149 sink(deriving.push(0));
150 sink(deriving.end());
151 }
152 }));
153 };
154
155 const sink: types.sinkT<any> = signal => {
156 if (deriving.isStart(signal)) {
157 talkback = deriving.unboxStart(signal);
158 } else {
159 signals.push(signal);
160 if (deriving.isEnd(signal)) ending++;
161 }
162 };
163
164 operator(source)(sink);
165
166 // When pushing a value we expect an immediate Push then End signal
167 talkback(deriving.pull);
168 jest.runAllTimers();
169 expect(pulls).toBe(1);
170 expect(ending).toBe(1);
171 expect(signals).toEqual([deriving.push(result), deriving.end()]);
172 });
173
174/* This tests a noop operator for Start signals from the source.
175 When the operator's sink is started by the source it'll receive
176 a Start event. As a response it should never send more than one
177 Start signals to the sink. */
178const passesSingleStart = (operator: types.operatorT<any, any>) =>
179 it('sends a single Start event to the incoming sink (spec)', () => {
180 let start = 0;
181
182 const source: types.sourceT<any> = sink => {
183 sink(deriving.start(() => {}));
184 };
185
186 const sink: types.sinkT<any> = signal => {
187 if (deriving.isStart(signal)) start++;
188 };
189
190 // When starting the operator we expect a single start event on the sink
191 operator(source)(sink);
192 expect(start).toBe(1);
193 });
194
195/* This tests a noop operator for silence after End signals from the source.
196 When the operator receives the End signal it shouldn't forward any other
197 signals to the sink anymore.
198 This isn't a strict requirement, but some operators should ensure that
199 all sources are well behaved. This is particularly true for operators
200 that either Close sources themselves or may operate on multiple sources. */
201const passesStrictEnd = (operator: types.operatorT<any, any>) =>
202 it('stops all signals after End has been received (spec: strict end)', () => {
203 let pulls = 0;
204 const signals = [];
205
206 const source: types.sourceT<any> = sink => {
207 sink(deriving.start(tb => {
208 if (tb === deriving.pull) {
209 pulls++;
210 sink(deriving.end());
211 sink(deriving.push(0));
212 }
213 }));
214 };
215
216 const sink: types.sinkT<any> = signal => {
217 if (deriving.isStart(signal)) {
218 deriving.unboxStart(signal)(deriving.pull);
219 } else {
220 signals.push(signal);
221 }
222 };
223
224 operator(source)(sink);
225
226 // The Push signal should've been dropped
227 jest.runAllTimers();
228 expect(signals).toEqual([deriving.end()]);
229 expect(pulls).toBe(1);
230 });
231
232/* This tests an immediately closing operator for End signals to
233 the sink and Close signals to the source.
234 When an operator closes immediately we expect to see a Close
235 signal at the source and an End signal to the sink, since the
236 closing operator is expected to end the entire chain. */
237const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) =>
238 it('closes the source and ends the sink correctly (spec: ending operator)', () => {
239 let closing = 0;
240 let ending = 0;
241
242 const source: types.sourceT<any> = sink => {
243 sink(deriving.start(tb => {
244 // For some operator tests we do need to send a single value
245 if (tb === deriving.pull)
246 sink(deriving.push(null));
247 if (tb === deriving.close)
248 closing++;
249 }));
250 };
251
252 const sink: types.sinkT<any> = signal => {
253 if (deriving.isStart(signal)) {
254 deriving.unboxStart(signal)(deriving.pull);
255 } if (deriving.isEnd(signal)) {
256 ending++;
257 }
258 };
259
260 // We expect the operator to immediately end and close
261 closingOperator(source)(sink);
262 expect(closing).toBe(1);
263 expect(ending).toBe(1);
264 });
265
266const passesAsyncSequence = (
267 operator: types.operatorT<any, any>,
268 result: any = 0
269) =>
270 it('passes an async push with an async end (spec)', () => {
271 let hasPushed = false;
272 const signals = [];
273
274 const source: types.sourceT<any> = sink => {
275 sink(deriving.start(tb => {
276 if (tb === deriving.pull && !hasPushed) {
277 hasPushed = true;
278 setTimeout(() => sink(deriving.push(0)), 10);
279 setTimeout(() => sink(deriving.end()), 20);
280 }
281 }));
282 };
283
284 const sink: types.sinkT<any> = signal => {
285 if (deriving.isStart(signal)) {
286 setTimeout(() => {
287 deriving.unboxStart(signal)(deriving.pull);
288 }, 5);
289 } else {
290 signals.push(signal);
291 }
292 };
293
294 // We initially expect to see the push signal
295 // Afterwards after all timers all other signals come in
296 operator(source)(sink);
297 expect(signals.length).toBe(0);
298 jest.advanceTimersByTime(5);
299 expect(hasPushed).toBeTruthy();
300 jest.runAllTimers();
301
302 expect(signals).toEqual([
303 deriving.push(result),
304 deriving.end()
305 ]);
306 });
307
308beforeEach(() => {
309 jest.useFakeTimers();
310});
311
312describe('combine', () => {
313 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source);
314
315 passesPassivePull(noop, [0, 0]);
316 // TODO: passesActivePush(noop, [0, 0]);
317 passesSinkClose(noop);
318 passesSourceEnd(noop, [0, 0]);
319 passesSingleStart(noop);
320 passesStrictEnd(noop);
321
322 it('emits the zipped values of two sources', () => {
323 const { source: sourceA, next: nextA } = sources.makeSubject();
324 const { source: sourceB, next: nextB } = sources.makeSubject();
325 const fn = jest.fn();
326
327 sinks.forEach(fn)(operators.combine(sourceA, sourceB));
328
329 nextA(1);
330 expect(fn).not.toHaveBeenCalled();
331 nextB(2);
332 expect(fn).toHaveBeenCalledWith([1, 2]);
333 });
334});
335
336describe('buffer', () => {
337 const noop = operators.buffer(
338 operators.merge([
339 sources.fromValue(null),
340 sources.never
341 ])
342 );
343
344 // TODO: passesPassivePull(noop, [0]);
345 // TODO: passesActivePush(noop);
346 // TODO: passesSinkClose(noop);
347 // TODO: passesSourceEnd(noop);
348 passesSingleStart(noop);
349 // TODO: passesStrictEnd(noop);
350
351 it('emits batches of input values when a notifier emits', () => {
352 const { source: notifier$, next: notify } = sources.makeSubject();
353 const { source: input$, next } = sources.makeSubject();
354 const fn = jest.fn();
355
356 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
357
358 next(1);
359 next(2);
360 expect(fn).not.toHaveBeenCalled();
361
362 notify(null);
363 expect(fn).toHaveBeenCalledWith([1, 2]);
364 });
365});
366
367describe('concatMap', () => {
368 const noop = operators.concatMap(x => sources.fromValue(x));
369 // TODO: passesPassivePull(noop);
370 // TODO: passesActivePush(noop);
371 // TODO: passesSinkClose(noop);
372 // TODO: passesSourceEnd(noop);
373 passesSingleStart(noop);
374 passesStrictEnd(noop);
375 passesAsyncSequence(noop);
376
377 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
378 it('emits values from each flattened synchronous source', () => {
379 const { source, next, complete } = sources.makeSubject<number>();
380 const fn = jest.fn();
381
382 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
383
384 next(1);
385 next(3);
386 complete();
387
388 expect(fn).toHaveBeenCalledTimes(6);
389 expect(fn.mock.calls).toEqual([
390 [deriving.start(expect.any(Function))],
391 [deriving.push(1)],
392 [deriving.push(2)],
393 [deriving.push(3)],
394 [deriving.push(4)],
395 [deriving.end()],
396 ]);
397 });
398
399 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
400 it('emits values from each flattened asynchronous source, one at a time', () => {
401 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
402 const fn = jest.fn();
403
404 sinks.forEach(fn)(
405 operators.concatMap((x: number) => {
406 return web.delay(5)(sources.fromArray([x, x * 2]));
407 })(source)
408 );
409
410 jest.advanceTimersByTime(14);
411 expect(fn.mock.calls).toEqual([
412 [1],
413 [2],
414 ]);
415
416 jest.runAllTimers();
417 expect(fn.mock.calls).toEqual([
418 [1],
419 [2],
420 [10],
421 [20],
422 ]);
423 });
424});
425
426describe('debounce', () => {
427 const noop = web.debounce(() => 0);
428 passesPassivePull(noop);
429 passesActivePush(noop);
430 passesSinkClose(noop);
431 passesSourceEnd(noop);
432 passesSingleStart(noop);
433 // TODO: passesStrictEnd(noop);
434 passesAsyncSequence(noop);
435
436 it('waits for a specified amount of silence before emitting the last value', () => {
437 const { source, next } = sources.makeSubject<number>();
438 const fn = jest.fn();
439
440 sinks.forEach(fn)(web.debounce(() => 100)(source));
441
442 next(1);
443 jest.advanceTimersByTime(50);
444 expect(fn).not.toHaveBeenCalled();
445
446 next(2);
447 jest.advanceTimersByTime(99);
448 expect(fn).not.toHaveBeenCalled();
449
450 jest.advanceTimersByTime(1);
451 expect(fn).toHaveBeenCalledWith(2);
452 });
453});
454
455describe('delay', () => {
456 const noop = web.delay(0);
457 passesPassivePull(noop);
458 passesActivePush(noop);
459 // TODO: passesSinkClose(noop);
460 passesSourceEnd(noop);
461 passesSingleStart(noop);
462 passesAsyncSequence(noop);
463
464 it('delays outputs by a specified delay timeout value', () => {
465 const { source, next } = sources.makeSubject();
466 const fn = jest.fn();
467
468 sinks.forEach(fn)(web.delay(100)(source));
469
470 next(1);
471 expect(fn).not.toHaveBeenCalled();
472
473 jest.advanceTimersByTime(100);
474 expect(fn).toHaveBeenCalledWith(1);
475 });
476});
477
478describe('filter', () => {
479 const noop = operators.filter(() => true);
480 passesPassivePull(noop);
481 passesActivePush(noop);
482 passesSinkClose(noop);
483 passesSourceEnd(noop);
484 passesSingleStart(noop);
485 passesAsyncSequence(noop);
486
487 it('prevents emissions for which a predicate fails', () => {
488 const { source, next } = sources.makeSubject();
489 const fn = jest.fn();
490
491 sinks.forEach(fn)(operators.filter(x => !!x)(source));
492
493 next(false);
494 expect(fn).not.toHaveBeenCalled();
495
496 next(true);
497 expect(fn).toHaveBeenCalledWith(true);
498 });
499});
500
501describe('map', () => {
502 const noop = operators.map(x => x);
503 passesPassivePull(noop);
504 passesActivePush(noop);
505 passesSinkClose(noop);
506 passesSourceEnd(noop);
507 passesSingleStart(noop);
508 // TODO: passesStrictEnd(noop);
509 passesAsyncSequence(noop);
510
511 it('maps over values given a transform function', () => {
512 const { source, next } = sources.makeSubject<number>();
513 const fn = jest.fn();
514
515 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
516
517 next(1);
518 expect(fn).toHaveBeenCalledWith(2);
519 });
520});
521
522describe('mergeMap', () => {
523 const noop = operators.mergeMap(x => sources.fromValue(x));
524 // TODO: passesPassivePull(noop);
525 // TODO: passesActivePush(noop);
526 // TODO: passesSinkClose(noop);
527 // TODO: passesSourceEnd(noop);
528 passesSingleStart(noop);
529 passesStrictEnd(noop);
530 passesAsyncSequence(noop);
531
532 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
533 it('emits values from each flattened synchronous source', () => {
534 const { source, next, complete } = sources.makeSubject<number>();
535 const fn = jest.fn();
536
537 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
538
539 next(1);
540 next(3);
541 complete();
542
543 expect(fn).toHaveBeenCalledTimes(6);
544 expect(fn.mock.calls).toEqual([
545 [deriving.start(expect.any(Function))],
546 [deriving.push(1)],
547 [deriving.push(2)],
548 [deriving.push(3)],
549 [deriving.push(4)],
550 [deriving.end()],
551 ]);
552 });
553
554 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
555 it('emits values from each flattened asynchronous source simultaneously', () => {
556 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
557 const fn = jest.fn();
558
559 sinks.forEach(fn)(
560 operators.mergeMap((x: number) => {
561 return web.delay(5)(sources.fromArray([x, x * 2]));
562 })(source)
563 );
564
565 jest.advanceTimersByTime(14);
566 expect(fn.mock.calls).toEqual([
567 [1],
568 [10],
569 [2],
570 [20],
571 ]);
572 });
573});
574
575describe('onEnd', () => {
576 const noop = operators.onEnd(() => {});
577 passesPassivePull(noop);
578 passesActivePush(noop);
579 passesSinkClose(noop);
580 passesSourceEnd(noop);
581 passesSingleStart(noop);
582 passesAsyncSequence(noop);
583
584 it('calls a callback when the source ends', () => {
585 const { source, next, complete } = sources.makeSubject<number>();
586 const fn = jest.fn();
587
588 sinks.forEach(() => {})(operators.onEnd(fn)(source));
589
590 next(null);
591 expect(fn).not.toHaveBeenCalled();
592
593 complete();
594 expect(fn).toHaveBeenCalled();
595 });
596});
597
598describe('onPush', () => {
599 const noop = operators.onPush(() => {});
600 passesPassivePull(noop);
601 passesActivePush(noop);
602 passesSinkClose(noop);
603 passesSourceEnd(noop);
604 passesSingleStart(noop);
605 passesAsyncSequence(noop);
606
607 it('calls a callback when the source emits', () => {
608 const { source, next } = sources.makeSubject<number>();
609 const fn = jest.fn();
610
611 sinks.forEach(() => {})(operators.onPush(fn)(source));
612
613 next(1);
614 expect(fn).toHaveBeenCalledWith(1);
615 next(2);
616 expect(fn).toHaveBeenCalledWith(2);
617 });
618
619 it('is the same as `tap`', () => {
620 expect(operators.onPush).toBe(operators.tap);
621 });
622});
623
624describe('onStart', () => {
625 const noop = operators.onStart(() => {});
626 passesPassivePull(noop);
627 passesActivePush(noop);
628 passesSinkClose(noop);
629 passesSourceEnd(noop);
630 passesSingleStart(noop);
631 passesAsyncSequence(noop);
632
633 it('is called when the source starts', () => {
634 let sink: types.sinkT<any>;
635
636 const fn = jest.fn();
637 const source: types.sourceT<any> = _sink => { sink = _sink; };
638
639 sinks.forEach(() => {})(operators.onStart(fn)(source));
640
641 expect(fn).not.toHaveBeenCalled();
642
643 sink(deriving.start(() => {}));
644 expect(fn).toHaveBeenCalled();
645 });
646});
647
648describe('sample', () => {
649 const noop = operators.sample(sources.fromValue(null));
650 // TODO: passesPassivePull(noop);
651 // TODO: passesActivePush(noop);
652 // TODO: passesSinkClose(noop);
653 // TODO: passesSourceEnd(noop);
654 passesSingleStart(noop);
655 // TODO: passesStrictEnd(noop);
656
657 it('emits the latest value when a notifier source emits', () => {
658 const { source: notifier$, next: notify } = sources.makeSubject();
659 const { source: input$, next } = sources.makeSubject();
660 const fn = jest.fn();
661
662 sinks.forEach(fn)(operators.sample(notifier$)(input$));
663
664 next(1);
665 next(2);
666 expect(fn).not.toHaveBeenCalled();
667
668 notify(null);
669 expect(fn).toHaveBeenCalledWith(2);
670 });
671});
672
673describe('scan', () => {
674 const noop = operators.scan((_acc, x) => x, null);
675 passesPassivePull(noop);
676 passesActivePush(noop);
677 passesSinkClose(noop);
678 passesSourceEnd(noop);
679 passesSingleStart(noop);
680 passesAsyncSequence(noop);
681
682 it('folds values continuously with a reducer and initial value', () => {
683 const { source: input$, next } = sources.makeSubject<number>();
684 const fn = jest.fn();
685
686 const reducer = (acc: number, x: number) => acc + x;
687 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
688
689 next(1);
690 expect(fn).toHaveBeenCalledWith(1);
691 next(2);
692 expect(fn).toHaveBeenCalledWith(3);
693 });
694});
695
696describe('share', () => {
697 const noop = operators.share;
698 passesPassivePull(noop);
699 passesActivePush(noop);
700 passesSinkClose(noop);
701 passesSourceEnd(noop);
702 passesSingleStart(noop);
703 passesStrictEnd(noop);
704 passesAsyncSequence(noop);
705
706 it('shares output values between sinks', () => {
707 let push = () => {};
708
709 const source: types.sourceT<any> = operators.share(sink => {
710 sink(deriving.start(() => {}));
711 push = () => {
712 sink(deriving.push([0]));
713 sink(deriving.end());
714 };
715 });
716
717 const fnA = jest.fn();
718 const fnB = jest.fn();
719
720 sinks.forEach(fnA)(source);
721 sinks.forEach(fnB)(source);
722 push();
723
724 expect(fnA).toHaveBeenCalledWith([0]);
725 expect(fnB).toHaveBeenCalledWith([0]);
726 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
727 });
728});
729
730describe('skip', () => {
731 const noop = operators.skip(0);
732 passesPassivePull(noop);
733 passesActivePush(noop);
734 passesSinkClose(noop);
735 passesSourceEnd(noop);
736 passesSingleStart(noop);
737 passesAsyncSequence(noop);
738
739 it('skips a number of values before emitting normally', () => {
740 const { source, next } = sources.makeSubject<number>();
741 const fn = jest.fn();
742
743 sinks.forEach(fn)(operators.skip(1)(source));
744
745 next(1);
746 expect(fn).not.toHaveBeenCalled();
747 next(2);
748 expect(fn).toHaveBeenCalledWith(2);
749 });
750});
751
752describe('skipUntil', () => {
753 const noop = operators.skipUntil(sources.fromValue(null));
754 // TODO: passesPassivePull(noop);
755 // TODO: passesActivePush(noop);
756 // TODO: passesSinkClose(noop);
757 passesSourceEnd(noop);
758 passesSingleStart(noop);
759 passesAsyncSequence(noop);
760
761 it('skips values until the notifier source emits', () => {
762 const { source: notifier$, next: notify } = sources.makeSubject();
763 const { source: input$, next } = sources.makeSubject<number>();
764 const fn = jest.fn();
765
766 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
767
768 next(1);
769 expect(fn).not.toHaveBeenCalled();
770 notify(null);
771 next(2);
772 expect(fn).toHaveBeenCalledWith(2);
773 });
774});
775
776describe('skipWhile', () => {
777 const noop = operators.skipWhile(() => false);
778 passesPassivePull(noop);
779 passesActivePush(noop);
780 passesSinkClose(noop);
781 passesSourceEnd(noop);
782 passesSingleStart(noop);
783 passesAsyncSequence(noop);
784
785 it('skips values until one fails a predicate', () => {
786 const { source, next } = sources.makeSubject<number>();
787 const fn = jest.fn();
788
789 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source));
790
791 next(1);
792 expect(fn).not.toHaveBeenCalled();
793 next(2);
794 expect(fn).toHaveBeenCalledWith(2);
795 });
796});
797
798describe('switchMap', () => {
799 const noop = operators.switchMap(x => sources.fromValue(x));
800 // TODO: passesPassivePull(noop);
801 // TODO: passesActivePush(noop);
802 // TODO: passesSinkClose(noop);
803 // TODO: passesSourceEnd(noop);
804 passesSingleStart(noop);
805 passesStrictEnd(noop);
806 passesAsyncSequence(noop);
807
808 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
809 it('emits values from each flattened synchronous source', () => {
810 const { source, next, complete } = sources.makeSubject<number>();
811 const fn = jest.fn();
812
813 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
814
815 next(1);
816 next(3);
817 complete();
818
819 expect(fn).toHaveBeenCalledTimes(6);
820 expect(fn.mock.calls).toEqual([
821 [deriving.start(expect.any(Function))],
822 [deriving.push(1)],
823 [deriving.push(2)],
824 [deriving.push(3)],
825 [deriving.push(4)],
826 [deriving.end()],
827 ]);
828 });
829
830 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
831 it('emits values from each flattened asynchronous source, one at a time', () => {
832 const source = web.delay<number>(4)(sources.fromArray([1, 10]));
833 const fn = jest.fn();
834
835 sinks.forEach(fn)(
836 operators.switchMap((x: number) => {
837 return web.delay(5)(sources.fromArray([x, x * 2]));
838 })(source)
839 );
840
841 jest.runAllTimers();
842 expect(fn.mock.calls).toEqual([
843 [1],
844 [10],
845 [20],
846 ]);
847 });
848});
849
850describe('take', () => {
851 const noop = operators.take(10);
852 passesPassivePull(noop);
853 passesActivePush(noop);
854 // TODO: passesSinkClose(noop);
855 passesSourceEnd(noop);
856 passesSingleStart(noop);
857 passesStrictEnd(noop);
858 passesAsyncSequence(noop);
859
860 // TODO: take(0) seems to be broken
861 const ending = operators.take(1);
862 passesCloseAndEnd(ending);
863
864 it('emits values until a maximum is reached', () => {
865 const { source, next } = sources.makeSubject<number>();
866 const fn = jest.fn();
867
868 operators.take(1)(source)(fn);
869 next(1);
870
871 expect(fn).toHaveBeenCalledTimes(3);
872 expect(fn.mock.calls).toEqual([
873 [deriving.start(expect.any(Function))],
874 [deriving.push(1)],
875 [deriving.end()],
876 ]);
877 });
878});
879
880describe('takeUntil', () => {
881 const noop = operators.takeUntil(sources.never);
882 passesPassivePull(noop);
883 passesActivePush(noop);
884 passesSinkClose(noop);
885 passesSourceEnd(noop);
886 passesSingleStart(noop);
887 passesStrictEnd(noop);
888 passesAsyncSequence(noop);
889
890 const ending = operators.takeUntil(sources.fromValue(null));
891 passesCloseAndEnd(ending);
892
893 it('emits values until a notifier emits', () => {
894 const { source: notifier$, next: notify } = sources.makeSubject<number>();
895 const { source: input$, next } = sources.makeSubject<number>();
896 const fn = jest.fn();
897
898 operators.takeUntil(notifier$)(input$)(fn);
899 next(1);
900
901 expect(fn).toHaveBeenCalledTimes(2);
902 expect(fn.mock.calls).toEqual([
903 [deriving.start(expect.any(Function))],
904 [deriving.push(1)],
905 ]);
906
907 notify(null);
908 expect(fn).toHaveBeenCalledTimes(3);
909 expect(fn.mock.calls[2][0]).toEqual(deriving.end());
910 });
911});
912
913describe('takeWhile', () => {
914 const noop = operators.takeWhile(() => true);
915 passesPassivePull(noop);
916 passesActivePush(noop);
917 passesSinkClose(noop);
918 passesSourceEnd(noop);
919 // TODO: passesSingleStart(noop);
920 passesStrictEnd(noop);
921 passesAsyncSequence(noop);
922
923 const ending = operators.takeWhile(() => false);
924 passesCloseAndEnd(ending);
925
926 it('emits values while a predicate passes for all values', () => {
927 const { source, next } = sources.makeSubject<number>();
928 const fn = jest.fn();
929
930 operators.takeWhile(x => x < 2)(source)(fn);
931 next(1);
932 next(2);
933
934 expect(fn).toHaveBeenCalledTimes(4);
935 expect(fn.mock.calls).toEqual([
936 [deriving.start(expect.any(Function))],
937 [deriving.start(expect.any(Function))],
938 [deriving.push(1)],
939 [deriving.end()],
940 ]);
941 });
942});
943
944describe('throttle', () => {
945 const noop = web.throttle(() => 0);
946 passesPassivePull(noop);
947 passesActivePush(noop);
948 passesSinkClose(noop);
949 passesSourceEnd(noop);
950 passesSingleStart(noop);
951 passesAsyncSequence(noop);
952
953 it('should ignore emissions for a period of time after a value', () => {
954 const { source, next } = sources.makeSubject<number>();
955 const fn = jest.fn();
956
957 sinks.forEach(fn)(web.throttle(() => 100)(source));
958
959 next(1);
960 expect(fn).toHaveBeenCalledWith(1);
961 jest.advanceTimersByTime(50);
962
963 next(2);
964 expect(fn).toHaveBeenCalledTimes(1);
965 jest.advanceTimersByTime(50);
966
967 next(3);
968 expect(fn).toHaveBeenCalledWith(3);
969 });
970});