1import { describe, it, expect, beforeEach, vi } from 'vitest';
2
3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
4import { push, start } from '../helpers';
5
6import {
7 passesPassivePull,
8 passesActivePush,
9 passesSinkClose,
10 passesSourceEnd,
11 passesSingleStart,
12 passesStrictEnd,
13 passesSourcePushThenEnd,
14 passesAsyncSequence,
15 passesCloseAndEnd,
16} from './compliance';
17
18import * as sources from '../sources';
19import * as sinks from '../sinks';
20import * as operators from '../operators';
21
22beforeEach(() => {
23 vi.useFakeTimers();
24});
25
26describe('buffer', () => {
27 const valueThenNever: Source<any> = sink =>
28 sink(
29 start(signal => {
30 if (signal === TalkbackKind.Pull) sink(push(null));
31 })
32 );
33
34 const noop = operators.buffer(valueThenNever);
35
36 passesPassivePull(noop, [0]);
37 passesActivePush(noop, [0]);
38 passesSinkClose(noop);
39 passesSourcePushThenEnd(noop, [0]);
40 passesSingleStart(noop);
41 passesStrictEnd(noop);
42
43 it('emits batches of input values when a notifier emits', () => {
44 const { source: notifier$, next: notify } = sources.makeSubject();
45 const { source: input$, next } = sources.makeSubject();
46 const fn = vi.fn();
47
48 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
49
50 next(1);
51 next(2);
52 expect(fn).not.toHaveBeenCalled();
53
54 notify(null);
55 expect(fn).toHaveBeenCalledWith([1, 2]);
56
57 next(3);
58 notify(null);
59 expect(fn).toHaveBeenCalledWith([3]);
60 });
61});
62
63describe('concatMap', () => {
64 const noop = operators.concatMap(x => sources.fromValue(x));
65 passesPassivePull(noop);
66 passesActivePush(noop);
67 passesSinkClose(noop);
68 passesSourcePushThenEnd(noop);
69 passesSingleStart(noop);
70 passesStrictEnd(noop);
71 passesAsyncSequence(noop);
72
73 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
74 it('emits values from each flattened synchronous source', () => {
75 const { source, next, complete } = sources.makeSubject<number>();
76 const fn = vi.fn();
77
78 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
79
80 next(1);
81 next(3);
82 complete();
83
84 expect(fn).toHaveBeenCalledTimes(6);
85 expect(fn.mock.calls).toEqual([
86 [start(expect.any(Function))],
87 [push(1)],
88 [push(2)],
89 [push(3)],
90 [push(4)],
91 [SignalKind.End],
92 ]);
93 });
94
95 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
96 it('lets inner sources finish when outer source ends', () => {
97 const signals: Signal<any>[] = [];
98 const teardown = vi.fn();
99 const fn = (signal: Signal<any>) => {
100 signals.push(signal);
101 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
102 signal[0](TalkbackKind.Pull);
103 signal[0](TalkbackKind.Close);
104 }
105 };
106
107 operators.concatMap(() => {
108 return sources.make(() => teardown);
109 })(sources.fromValue(null))(fn);
110
111 expect(teardown).toHaveBeenCalled();
112 expect(signals).toEqual([start(expect.any(Function))]);
113 });
114
115 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
116 it('emits values from each flattened asynchronous source, one at a time', () => {
117 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
118 const fn = vi.fn();
119
120 sinks.forEach(fn)(
121 operators.concatMap((x: number) => {
122 return operators.delay(5)(sources.fromArray([x, x * 2]));
123 })(source)
124 );
125
126 vi.advanceTimersByTime(14);
127 expect(fn.mock.calls).toEqual([[1], [2]]);
128
129 vi.runAllTimers();
130 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]);
131 });
132
133 it('works for fully asynchronous sources', () => {
134 const fn = vi.fn();
135
136 sinks.forEach(fn)(
137 operators.concatMap(() => {
138 return sources.make(observer => {
139 setTimeout(() => observer.next(1));
140 return () => {};
141 });
142 })(sources.fromValue(null))
143 );
144
145 vi.runAllTimers();
146 expect(fn).toHaveBeenCalledWith(1);
147 });
148
149 it('emits synchronous values in order', () => {
150 const values: any[] = [];
151
152 sinks.forEach(x => values.push(x))(
153 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
154 );
155
156 expect(values).toEqual([1, 2, 3, 4]);
157 });
158});
159
160describe('debounce', () => {
161 const noop = operators.debounce(() => 0);
162 passesPassivePull(noop);
163 passesActivePush(noop);
164 passesSinkClose(noop);
165 passesSourceEnd(noop);
166 passesSingleStart(noop);
167 passesStrictEnd(noop);
168 passesAsyncSequence(noop);
169
170 it('waits for a specified amount of silence before emitting the last value', () => {
171 const { source, next } = sources.makeSubject<number>();
172 const fn = vi.fn();
173
174 sinks.forEach(fn)(operators.debounce(() => 100)(source));
175
176 next(1);
177 vi.advanceTimersByTime(50);
178 expect(fn).not.toHaveBeenCalled();
179
180 next(2);
181 vi.advanceTimersByTime(99);
182 expect(fn).not.toHaveBeenCalled();
183
184 vi.advanceTimersByTime(1);
185 expect(fn).toHaveBeenCalledWith(2);
186 });
187
188 it('emits debounced value with delayed End signal', () => {
189 const { source, next, complete } = sources.makeSubject<number>();
190 const fn = vi.fn();
191
192 sinks.forEach(fn)(operators.debounce(() => 100)(source));
193
194 next(1);
195 complete();
196 vi.advanceTimersByTime(100);
197 expect(fn).toHaveBeenCalled();
198 });
199});
200
201describe('delay', () => {
202 const noop = operators.delay(0);
203 passesPassivePull(noop);
204 passesActivePush(noop);
205 passesSinkClose(noop);
206 passesSourceEnd(noop);
207 passesSingleStart(noop);
208 passesAsyncSequence(noop);
209
210 it('delays outputs by a specified delay timeout value', () => {
211 const { source, next } = sources.makeSubject();
212 const fn = vi.fn();
213
214 sinks.forEach(fn)(operators.delay(100)(source));
215
216 next(1);
217 expect(fn).not.toHaveBeenCalled();
218
219 vi.advanceTimersByTime(100);
220 expect(fn).toHaveBeenCalledWith(1);
221 });
222});
223
224describe('filter', () => {
225 const noop = operators.filter(() => true);
226 passesPassivePull(noop);
227 passesActivePush(noop);
228 passesSinkClose(noop);
229 passesSourceEnd(noop);
230 passesSingleStart(noop);
231 passesAsyncSequence(noop);
232
233 it('prevents emissions for which a predicate fails', () => {
234 const { source, next } = sources.makeSubject<boolean>();
235 const fn = vi.fn();
236
237 sinks.forEach((x: true) => {
238 fn(x);
239 })(operators.filter((x): x is true => !!x)(source));
240
241 next(false);
242 expect(fn).not.toHaveBeenCalled();
243
244 next(true);
245 expect(fn).toHaveBeenCalledWith(true);
246 });
247});
248
249describe('map', () => {
250 const noop = operators.map(x => x);
251 passesPassivePull(noop);
252 passesActivePush(noop);
253 passesSinkClose(noop);
254 passesSourceEnd(noop);
255 passesSingleStart(noop);
256 passesAsyncSequence(noop);
257
258 it('maps over values given a transform function', () => {
259 const { source, next } = sources.makeSubject<number>();
260 const fn = vi.fn();
261
262 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
263
264 next(1);
265 expect(fn).toHaveBeenCalledWith(2);
266 });
267});
268
269describe('mergeMap', () => {
270 const noop = operators.mergeMap(x => sources.fromValue(x));
271 passesPassivePull(noop);
272 passesActivePush(noop);
273 passesSinkClose(noop);
274 passesSourcePushThenEnd(noop);
275 passesSingleStart(noop);
276 passesStrictEnd(noop);
277 passesAsyncSequence(noop);
278
279 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
280 it('emits values from each flattened synchronous source', () => {
281 const { source, next, complete } = sources.makeSubject<number>();
282 const fn = vi.fn();
283
284 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
285
286 next(1);
287 next(3);
288 complete();
289
290 expect(fn.mock.calls).toEqual([
291 [start(expect.any(Function))],
292 [push(1)],
293 [push(2)],
294 [push(3)],
295 [push(4)],
296 [SignalKind.End],
297 ]);
298 });
299
300 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
301 it('lets inner sources finish when outer source ends', () => {
302 const values: Signal<any>[] = [];
303 const teardown = vi.fn();
304 const fn = (signal: Signal<any>) => {
305 values.push(signal);
306 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
307 signal[0](TalkbackKind.Pull);
308 signal[0](TalkbackKind.Close);
309 }
310 };
311
312 operators.mergeMap(() => {
313 return sources.make(() => teardown);
314 })(sources.fromValue(null))(fn);
315
316 expect(teardown).toHaveBeenCalled();
317 expect(values).toEqual([start(expect.any(Function))]);
318 });
319
320 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
321 it('emits values from each flattened asynchronous source simultaneously', () => {
322 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
323 const fn = vi.fn();
324
325 sinks.forEach(fn)(
326 operators.mergeMap((x: number) => {
327 return operators.delay(5)(sources.fromArray([x, x * 2]));
328 })(source)
329 );
330
331 vi.runAllTimers();
332 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]);
333 });
334
335 it('emits synchronous values in order', () => {
336 const values: any[] = [];
337
338 sinks.forEach(x => values.push(x))(
339 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
340 );
341
342 expect(values).toEqual([1, 2, 3, 4]);
343 });
344});
345
346describe('onEnd', () => {
347 const noop = operators.onEnd(() => {});
348 passesPassivePull(noop);
349 passesActivePush(noop);
350 passesSinkClose(noop);
351 passesSourceEnd(noop);
352 passesStrictEnd(noop);
353 passesSingleStart(noop);
354 passesAsyncSequence(noop);
355
356 it('calls a callback when the source ends', () => {
357 const { source, next, complete } = sources.makeSubject<any>();
358 const fn = vi.fn();
359
360 sinks.forEach(() => {})(operators.onEnd(fn)(source));
361
362 next(null);
363 expect(fn).not.toHaveBeenCalled();
364
365 complete();
366 expect(fn).toHaveBeenCalled();
367 });
368});
369
370describe('onPush', () => {
371 const noop = operators.onPush(() => {});
372 passesPassivePull(noop);
373 passesActivePush(noop);
374 passesSinkClose(noop);
375 passesSourceEnd(noop);
376 passesStrictEnd(noop);
377 passesSingleStart(noop);
378 passesAsyncSequence(noop);
379
380 it('calls a callback when the source emits', () => {
381 const { source, next } = sources.makeSubject<number>();
382 const fn = vi.fn();
383
384 sinks.forEach(() => {})(operators.onPush(fn)(source));
385
386 next(1);
387 expect(fn).toHaveBeenCalledWith(1);
388 next(2);
389 expect(fn).toHaveBeenCalledWith(2);
390 });
391
392 it('is the same as `tap`', () => {
393 expect(operators.onPush).toBe(operators.tap);
394 });
395});
396
397describe('onStart', () => {
398 const noop = operators.onStart(() => {});
399 passesPassivePull(noop);
400 passesActivePush(noop);
401 passesSinkClose(noop);
402 passesSourceEnd(noop);
403 passesSingleStart(noop);
404 passesAsyncSequence(noop);
405
406 it('is called when the source starts', () => {
407 let sink: Sink<any>;
408
409 const fn = vi.fn();
410 const source: Source<any> = _sink => {
411 sink = _sink;
412 };
413
414 sinks.forEach(() => {})(operators.onStart(fn)(source));
415
416 expect(fn).not.toHaveBeenCalled();
417
418 sink!(start(() => {}));
419 expect(fn).toHaveBeenCalled();
420 });
421});
422
423describe('sample', () => {
424 const valueThenNever: Source<any> = sink =>
425 sink(
426 start(signal => {
427 if (signal === TalkbackKind.Pull) sink(push(null));
428 })
429 );
430
431 const noop = operators.sample(valueThenNever);
432
433 passesPassivePull(noop);
434 passesActivePush(noop);
435 passesSinkClose(noop);
436 passesSourcePushThenEnd(noop);
437 passesSingleStart(noop);
438 passesStrictEnd(noop);
439
440 it('emits the latest value when a notifier source emits', () => {
441 const { source: notifier$, next: notify } = sources.makeSubject();
442 const { source: input$, next } = sources.makeSubject();
443 const fn = vi.fn();
444
445 sinks.forEach(fn)(operators.sample(notifier$)(input$));
446
447 next(1);
448 next(2);
449 expect(fn).not.toHaveBeenCalled();
450
451 notify(null);
452 expect(fn).toHaveBeenCalledWith(2);
453 });
454});
455
456describe('scan', () => {
457 const noop = operators.scan<any, any>((_acc, x) => x, null);
458 passesPassivePull(noop);
459 passesActivePush(noop);
460 passesSinkClose(noop);
461 passesSourceEnd(noop);
462 passesSingleStart(noop);
463 passesAsyncSequence(noop);
464
465 it('folds values continuously with a reducer and initial value', () => {
466 const { source: input$, next } = sources.makeSubject<number>();
467 const fn = vi.fn();
468
469 const reducer = (acc: number, x: number) => acc + x;
470 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
471
472 next(1);
473 expect(fn).toHaveBeenCalledWith(1);
474 next(2);
475 expect(fn).toHaveBeenCalledWith(3);
476 });
477});
478
479describe('share', () => {
480 const noop = operators.share;
481 passesPassivePull(noop);
482 passesActivePush(noop);
483 passesSinkClose(noop);
484 passesSourceEnd(noop);
485 passesSingleStart(noop);
486 passesStrictEnd(noop);
487 passesAsyncSequence(noop);
488
489 it('shares output values between sinks', () => {
490 let onPush = () => {};
491
492 const source: Source<any> = operators.share(sink => {
493 sink(start(() => {}));
494 onPush = () => {
495 sink(push([0]));
496 sink(SignalKind.End);
497 };
498 });
499
500 const fnA = vi.fn();
501 const fnB = vi.fn();
502
503 sinks.forEach(fnA)(source);
504 sinks.forEach(fnB)(source);
505 onPush();
506
507 expect(fnA).toHaveBeenCalledWith([0]);
508 expect(fnB).toHaveBeenCalledWith([0]);
509 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
510 });
511
512 it('completes the source when no more sink is listening', () => {
513 let onPush = () => {};
514
515 const talkback = vi.fn();
516 const source: Source<any> = operators.share(sink => {
517 sink(start(talkback));
518 onPush = () => {
519 sink(push([0]));
520 sink(push([1]));
521 sink(SignalKind.End);
522 };
523 });
524
525 const fnA = vi.fn();
526 const fnB = vi.fn();
527
528 sinks.forEach(fnA)(operators.take(1)(source));
529 sinks.forEach(fnB)(operators.take(1)(source));
530 onPush();
531
532 expect(fnA).toHaveBeenCalledWith([0]);
533 expect(fnB).toHaveBeenCalledWith([0]);
534 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
535 expect(talkback).toHaveBeenCalledWith(TalkbackKind.Close);
536 });
537});
538
539describe('skip', () => {
540 const noop = operators.skip(0);
541 passesPassivePull(noop);
542 passesActivePush(noop);
543 passesSinkClose(noop);
544 passesSourceEnd(noop);
545 passesSingleStart(noop);
546 passesAsyncSequence(noop);
547
548 it('skips a number of values before emitting normally', () => {
549 const { source, next } = sources.makeSubject<number>();
550 const fn = vi.fn();
551
552 sinks.forEach(fn)(operators.skip(1)(source));
553
554 next(1);
555 expect(fn).not.toHaveBeenCalled();
556 next(2);
557 expect(fn).toHaveBeenCalledWith(2);
558 });
559});
560
561describe('skipUntil', () => {
562 const noop = operators.skipUntil(sources.fromValue(null));
563 passesPassivePull(noop);
564 passesActivePush(noop);
565 passesSinkClose(noop);
566 passesSourceEnd(noop);
567 passesSingleStart(noop);
568 passesAsyncSequence(noop);
569 passesStrictEnd(noop);
570
571 it('skips values until the notifier source emits', () => {
572 const { source: notifier$, next: notify } = sources.makeSubject();
573 const { source: input$, next } = sources.makeSubject<number>();
574 const fn = vi.fn();
575
576 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
577
578 next(1);
579 expect(fn).not.toHaveBeenCalled();
580 notify(null);
581 next(2);
582 expect(fn).toHaveBeenCalledWith(2);
583 });
584});
585
586describe('skipWhile', () => {
587 const noop = operators.skipWhile(() => false);
588 passesPassivePull(noop);
589 passesActivePush(noop);
590 passesSinkClose(noop);
591 passesSourceEnd(noop);
592 passesSingleStart(noop);
593 passesAsyncSequence(noop);
594
595 it('skips values until one fails a predicate', () => {
596 const { source, next } = sources.makeSubject<number>();
597 const fn = vi.fn();
598
599 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source));
600
601 next(1);
602 expect(fn).not.toHaveBeenCalled();
603 next(2);
604 expect(fn).toHaveBeenCalledWith(2);
605 });
606});
607
608describe('switchMap', () => {
609 const noop = operators.switchMap(x => sources.fromValue(x));
610 passesPassivePull(noop);
611 passesActivePush(noop);
612 passesSinkClose(noop);
613 passesSourcePushThenEnd(noop);
614 passesSingleStart(noop);
615 passesStrictEnd(noop);
616 passesAsyncSequence(noop);
617
618 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
619 it('emits values from each flattened synchronous source', () => {
620 const { source, next, complete } = sources.makeSubject<number>();
621 const fn = vi.fn();
622
623 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
624
625 next(1);
626 next(3);
627 complete();
628
629 expect(fn).toHaveBeenCalledTimes(6);
630 expect(fn.mock.calls).toEqual([
631 [start(expect.any(Function))],
632 [push(1)],
633 [push(2)],
634 [push(3)],
635 [push(4)],
636 [SignalKind.End],
637 ]);
638 });
639
640 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
641 it('lets inner sources finish when outer source ends', () => {
642 const signals: Signal<any>[] = [];
643 const teardown = vi.fn();
644 const fn = (signal: Signal<any>) => {
645 signals.push(signal);
646 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
647 signal[0](TalkbackKind.Pull);
648 signal[0](TalkbackKind.Close);
649 }
650 };
651
652 operators.switchMap(() => {
653 return sources.make(() => teardown);
654 })(sources.fromValue(null))(fn);
655
656 expect(teardown).toHaveBeenCalled();
657 expect(signals).toEqual([start(expect.any(Function))]);
658 });
659
660 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
661 it('emits values from each flattened asynchronous source, one at a time', () => {
662 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
663 const fn = vi.fn();
664
665 sinks.forEach(fn)(
666 operators.switchMap((x: number) =>
667 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5)))
668 )(source)
669 );
670
671 vi.runAllTimers();
672 expect(fn.mock.calls).toEqual([[1], [10], [20]]);
673 });
674});
675
676describe('take', () => {
677 const noop = operators.take(10);
678 passesPassivePull(noop);
679 passesActivePush(noop);
680 passesSinkClose(noop);
681 passesSourceEnd(noop);
682 passesSingleStart(noop);
683 passesStrictEnd(noop);
684 passesAsyncSequence(noop);
685
686 passesCloseAndEnd(operators.take(0));
687
688 it('emits values until a maximum is reached', () => {
689 const { source, next } = sources.makeSubject<number>();
690 const fn = vi.fn();
691
692 operators.take(1)(source)(fn);
693 next(1);
694
695 expect(fn).toHaveBeenCalledTimes(3);
696 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
697 });
698});
699
700describe('takeUntil', () => {
701 const noop = operators.takeUntil(sources.never);
702 passesPassivePull(noop);
703 passesActivePush(noop);
704 passesSinkClose(noop);
705 passesSourcePushThenEnd(noop);
706 passesSingleStart(noop);
707 passesStrictEnd(noop);
708 passesAsyncSequence(noop);
709
710 const ending = operators.takeUntil(sources.fromValue(null));
711 passesCloseAndEnd(ending);
712
713 it('emits values until a notifier emits', () => {
714 const { source: notifier$, next: notify } = sources.makeSubject<any>();
715 const { source: input$, next } = sources.makeSubject<number>();
716 const fn = vi.fn();
717
718 operators.takeUntil(notifier$)(input$)(fn);
719 next(1);
720
721 expect(fn).toHaveBeenCalledTimes(2);
722 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]);
723
724 notify(null);
725 expect(fn).toHaveBeenCalledTimes(3);
726 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End);
727 });
728
729 it('emits values until a notifier emits', () => {
730 const { source: input$, next } = sources.makeSubject<number>();
731 const fn = vi.fn();
732
733 let hasClosed = false;
734
735 operators.takeUntil(sink => {
736 sink(
737 start(talkback => {
738 if (talkback === TalkbackKind.Close) {
739 hasClosed = true;
740 } else if (talkback === TalkbackKind.Pull && !hasClosed) {
741 sink(push(1));
742 }
743 })
744 );
745 })(input$)(fn);
746
747 next(1);
748
749 expect(fn).toHaveBeenCalledTimes(2);
750 expect(fn.mock.calls).toEqual([[0], [start(expect.any(Function))]]);
751
752 expect(hasClosed).toBe(true);
753 });
754});
755
756describe('takeWhile', () => {
757 const noop = operators.takeWhile(() => true);
758 passesPassivePull(noop);
759 passesActivePush(noop);
760 passesSinkClose(noop);
761 passesSourceEnd(noop);
762 passesSingleStart(noop);
763 passesAsyncSequence(noop);
764
765 const ending = operators.takeWhile(() => false);
766 passesCloseAndEnd(ending);
767
768 it('emits values while a predicate passes for all values', () => {
769 const { source, next } = sources.makeSubject<number>();
770 const fn = vi.fn();
771
772 operators.takeWhile((x: any) => x < 2)(source)(fn);
773 next(1);
774 next(2);
775
776 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
777 });
778});
779
780describe('takeLast', () => {
781 passesCloseAndEnd(operators.takeLast(0));
782
783 it('emits the last max values of an ended source', () => {
784 const { source, next, complete } = sources.makeSubject<number>();
785 const signals: Signal<any>[] = [];
786
787 let talkback: TalkbackFn;
788
789 operators.takeLast(1)(source)(signal => {
790 signals.push(signal);
791 if (signal === SignalKind.End) {
792 /*noop*/
793 } else if (signal.tag === SignalKind.Start) {
794 (talkback = signal[0])(TalkbackKind.Pull);
795 } else {
796 talkback!(TalkbackKind.Pull);
797 }
798 });
799
800 next(1);
801 next(2);
802
803 expect(signals.length).toBe(0);
804 complete();
805
806 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]);
807 });
808});
809
810describe('throttle', () => {
811 const noop = operators.throttle(() => 0);
812 passesPassivePull(noop);
813 passesActivePush(noop);
814 passesSinkClose(noop);
815 passesSourceEnd(noop);
816 passesSingleStart(noop);
817 passesAsyncSequence(noop);
818
819 it('should ignore emissions for a period of time after a value', () => {
820 const { source, next } = sources.makeSubject<number>();
821 const fn = vi.fn();
822
823 sinks.forEach(fn)(operators.throttle(() => 100)(source));
824
825 next(1);
826 expect(fn).toHaveBeenCalledWith(1);
827 vi.advanceTimersByTime(50);
828
829 next(2);
830 expect(fn).toHaveBeenCalledTimes(1);
831 vi.advanceTimersByTime(50);
832
833 next(3);
834 expect(fn).toHaveBeenCalledWith(3);
835 });
836});