1import { describe, it, expect, beforeEach, vi } from 'vitest';
2
3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
4import { push, start } from '../helpers';
5
6import {
7 passesPassivePull,
8 passesActivePush,
9 passesSinkClose,
10 passesSourceEnd,
11 passesSingleStart,
12 passesStrictEnd,
13 passesSourcePushThenEnd,
14 passesAsyncSequence,
15 passesCloseAndEnd,
16} from './compliance';
17
18import * as sources from '../sources';
19import * as sinks from '../sinks';
20import * as operators from '../operators';
21
22beforeEach(() => {
23 vi.useFakeTimers();
24});
25
26describe('buffer', () => {
27 const valueThenNever: Source<any> = sink =>
28 sink(
29 start(signal => {
30 if (signal === TalkbackKind.Pull) sink(push(null));
31 })
32 );
33
34 const noop = operators.buffer(valueThenNever);
35
36 passesPassivePull(noop, [0]);
37 passesActivePush(noop, [0]);
38 passesSinkClose(noop);
39 passesSourcePushThenEnd(noop, [0]);
40 passesSingleStart(noop);
41 passesStrictEnd(noop);
42
43 it('emits batches of input values when a notifier emits', () => {
44 const { source: notifier$, next: notify } = sources.makeSubject();
45 const { source: input$, next } = sources.makeSubject();
46 const fn = vi.fn();
47
48 sinks.forEach(fn)(operators.buffer(notifier$)(input$));
49
50 next(1);
51 next(2);
52 expect(fn).not.toHaveBeenCalled();
53
54 notify(null);
55 expect(fn).toHaveBeenCalledWith([1, 2]);
56
57 next(3);
58 notify(null);
59 expect(fn).toHaveBeenCalledWith([3]);
60 });
61});
62
63describe('concatMap', () => {
64 const noop = operators.concatMap(x => sources.fromValue(x));
65 passesPassivePull(noop);
66 passesActivePush(noop);
67 passesSinkClose(noop);
68 passesSourcePushThenEnd(noop);
69 passesSingleStart(noop);
70 passesStrictEnd(noop);
71 passesAsyncSequence(noop);
72
73 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
74 it('emits values from each flattened synchronous source', () => {
75 const { source, next, complete } = sources.makeSubject<number>();
76 const fn = vi.fn();
77
78 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
79
80 next(1);
81 next(3);
82 complete();
83
84 expect(fn).toHaveBeenCalledTimes(6);
85 expect(fn.mock.calls).toEqual([
86 [start(expect.any(Function))],
87 [push(1)],
88 [push(2)],
89 [push(3)],
90 [push(4)],
91 [SignalKind.End],
92 ]);
93 });
94
95 // This synchronous test for concatMap will behave the same as mergeMap & switchMap
96 it('lets inner sources finish when outer source ends', () => {
97 const signals: Signal<any>[] = [];
98 const teardown = vi.fn();
99 const fn = (signal: Signal<any>) => {
100 signals.push(signal);
101 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
102 signal[0](TalkbackKind.Pull);
103 signal[0](TalkbackKind.Close);
104 }
105 };
106
107 operators.concatMap(() => {
108 return sources.make(() => teardown);
109 })(sources.fromValue(null))(fn);
110
111 expect(teardown).toHaveBeenCalled();
112 expect(signals).toEqual([start(expect.any(Function))]);
113 });
114
115 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap
116 it('emits values from each flattened asynchronous source, one at a time', () => {
117 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
118 const fn = vi.fn();
119
120 sinks.forEach(fn)(
121 operators.concatMap((x: number) => {
122 return operators.delay(5)(sources.fromArray([x, x * 2]));
123 })(source)
124 );
125
126 vi.advanceTimersByTime(14);
127 expect(fn.mock.calls).toEqual([[1], [2]]);
128
129 vi.runAllTimers();
130 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]);
131 });
132
133 it('works for fully asynchronous sources', () => {
134 const fn = vi.fn();
135
136 sinks.forEach(fn)(
137 operators.concatMap(() => {
138 return sources.make(observer => {
139 setTimeout(() => observer.next(1));
140 return () => {};
141 });
142 })(sources.fromValue(null))
143 );
144
145 vi.runAllTimers();
146 expect(fn).toHaveBeenCalledWith(1);
147 });
148
149 it('emits synchronous values in order', () => {
150 const values: any[] = [];
151
152 sinks.forEach(x => values.push(x))(
153 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
154 );
155
156 expect(values).toEqual([1, 2, 3, 4]);
157 });
158});
159
160describe('debounce', () => {
161 const noop = operators.debounce(() => 0);
162 passesPassivePull(noop);
163 passesActivePush(noop);
164 passesSinkClose(noop);
165 passesSourceEnd(noop);
166 passesSingleStart(noop);
167 passesStrictEnd(noop);
168 passesAsyncSequence(noop);
169
170 it('waits for a specified amount of silence before emitting the last value', () => {
171 const { source, next } = sources.makeSubject<number>();
172 const fn = vi.fn();
173
174 sinks.forEach(fn)(operators.debounce(() => 100)(source));
175
176 next(1);
177 vi.advanceTimersByTime(50);
178 expect(fn).not.toHaveBeenCalled();
179
180 next(2);
181 vi.advanceTimersByTime(99);
182 expect(fn).not.toHaveBeenCalled();
183
184 vi.advanceTimersByTime(1);
185 expect(fn).toHaveBeenCalledWith(2);
186 });
187
188 it('emits debounced value with delayed End signal', () => {
189 const { source, next, complete } = sources.makeSubject<number>();
190 const fn = vi.fn();
191
192 sinks.forEach(fn)(operators.debounce(() => 100)(source));
193
194 next(1);
195 complete();
196 vi.advanceTimersByTime(100);
197 expect(fn).toHaveBeenCalled();
198 });
199});
200
201describe('delay', () => {
202 const noop = operators.delay(0);
203 passesPassivePull(noop);
204 passesActivePush(noop);
205 passesSinkClose(noop);
206 passesSourceEnd(noop);
207 passesSingleStart(noop);
208 passesAsyncSequence(noop);
209
210 it('delays outputs by a specified delay timeout value', () => {
211 const { source, next } = sources.makeSubject();
212 const fn = vi.fn();
213
214 sinks.forEach(fn)(operators.delay(100)(source));
215
216 next(1);
217 expect(fn).not.toHaveBeenCalled();
218
219 vi.advanceTimersByTime(100);
220 expect(fn).toHaveBeenCalledWith(1);
221 });
222});
223
224describe('filter', () => {
225 const noop = operators.filter(() => true);
226 passesPassivePull(noop);
227 passesActivePush(noop);
228 passesSinkClose(noop);
229 passesSourceEnd(noop);
230 passesSingleStart(noop);
231 passesAsyncSequence(noop);
232
233 it('prevents emissions for which a predicate fails', () => {
234 const { source, next } = sources.makeSubject();
235 const fn = vi.fn();
236
237 sinks.forEach(fn)(operators.filter(x => !!x)(source));
238
239 next(false);
240 expect(fn).not.toHaveBeenCalled();
241
242 next(true);
243 expect(fn).toHaveBeenCalledWith(true);
244 });
245});
246
247describe('map', () => {
248 const noop = operators.map(x => x);
249 passesPassivePull(noop);
250 passesActivePush(noop);
251 passesSinkClose(noop);
252 passesSourceEnd(noop);
253 passesSingleStart(noop);
254 passesAsyncSequence(noop);
255
256 it('maps over values given a transform function', () => {
257 const { source, next } = sources.makeSubject<number>();
258 const fn = vi.fn();
259
260 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source));
261
262 next(1);
263 expect(fn).toHaveBeenCalledWith(2);
264 });
265});
266
267describe('mergeMap', () => {
268 const noop = operators.mergeMap(x => sources.fromValue(x));
269 passesPassivePull(noop);
270 passesActivePush(noop);
271 passesSinkClose(noop);
272 passesSourcePushThenEnd(noop);
273 passesSingleStart(noop);
274 passesStrictEnd(noop);
275 passesAsyncSequence(noop);
276
277 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
278 it('emits values from each flattened synchronous source', () => {
279 const { source, next, complete } = sources.makeSubject<number>();
280 const fn = vi.fn();
281
282 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
283
284 next(1);
285 next(3);
286 complete();
287
288 expect(fn.mock.calls).toEqual([
289 [start(expect.any(Function))],
290 [push(1)],
291 [push(2)],
292 [push(3)],
293 [push(4)],
294 [SignalKind.End],
295 ]);
296 });
297
298 // This synchronous test for mergeMap will behave the same as concatMap & switchMap
299 it('lets inner sources finish when outer source ends', () => {
300 const values: Signal<any>[] = [];
301 const teardown = vi.fn();
302 const fn = (signal: Signal<any>) => {
303 values.push(signal);
304 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
305 signal[0](TalkbackKind.Pull);
306 signal[0](TalkbackKind.Close);
307 }
308 };
309
310 operators.mergeMap(() => {
311 return sources.make(() => teardown);
312 })(sources.fromValue(null))(fn);
313
314 expect(teardown).toHaveBeenCalled();
315 expect(values).toEqual([start(expect.any(Function))]);
316 });
317
318 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap
319 it('emits values from each flattened asynchronous source simultaneously', () => {
320 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
321 const fn = vi.fn();
322
323 sinks.forEach(fn)(
324 operators.mergeMap((x: number) => {
325 return operators.delay(5)(sources.fromArray([x, x * 2]));
326 })(source)
327 );
328
329 vi.runAllTimers();
330 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]);
331 });
332
333 it('emits synchronous values in order', () => {
334 const values: any[] = [];
335
336 sinks.forEach(x => values.push(x))(
337 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
338 );
339
340 expect(values).toEqual([1, 2, 3, 4]);
341 });
342});
343
344describe('onEnd', () => {
345 const noop = operators.onEnd(() => {});
346 passesPassivePull(noop);
347 passesActivePush(noop);
348 passesSinkClose(noop);
349 passesSourceEnd(noop);
350 passesStrictEnd(noop);
351 passesSingleStart(noop);
352 passesAsyncSequence(noop);
353
354 it('calls a callback when the source ends', () => {
355 const { source, next, complete } = sources.makeSubject<any>();
356 const fn = vi.fn();
357
358 sinks.forEach(() => {})(operators.onEnd(fn)(source));
359
360 next(null);
361 expect(fn).not.toHaveBeenCalled();
362
363 complete();
364 expect(fn).toHaveBeenCalled();
365 });
366});
367
368describe('onPush', () => {
369 const noop = operators.onPush(() => {});
370 passesPassivePull(noop);
371 passesActivePush(noop);
372 passesSinkClose(noop);
373 passesSourceEnd(noop);
374 passesStrictEnd(noop);
375 passesSingleStart(noop);
376 passesAsyncSequence(noop);
377
378 it('calls a callback when the source emits', () => {
379 const { source, next } = sources.makeSubject<number>();
380 const fn = vi.fn();
381
382 sinks.forEach(() => {})(operators.onPush(fn)(source));
383
384 next(1);
385 expect(fn).toHaveBeenCalledWith(1);
386 next(2);
387 expect(fn).toHaveBeenCalledWith(2);
388 });
389
390 it('is the same as `tap`', () => {
391 expect(operators.onPush).toBe(operators.tap);
392 });
393});
394
395describe('onStart', () => {
396 const noop = operators.onStart(() => {});
397 passesPassivePull(noop);
398 passesActivePush(noop);
399 passesSinkClose(noop);
400 passesSourceEnd(noop);
401 passesSingleStart(noop);
402 passesAsyncSequence(noop);
403
404 it('is called when the source starts', () => {
405 let sink: Sink<any>;
406
407 const fn = vi.fn();
408 const source: Source<any> = _sink => {
409 sink = _sink;
410 };
411
412 sinks.forEach(() => {})(operators.onStart(fn)(source));
413
414 expect(fn).not.toHaveBeenCalled();
415
416 sink!(start(() => {}));
417 expect(fn).toHaveBeenCalled();
418 });
419});
420
421describe('sample', () => {
422 const valueThenNever: Source<any> = sink =>
423 sink(
424 start(signal => {
425 if (signal === TalkbackKind.Pull) sink(push(null));
426 })
427 );
428
429 const noop = operators.sample(valueThenNever);
430
431 passesPassivePull(noop);
432 passesActivePush(noop);
433 passesSinkClose(noop);
434 passesSourcePushThenEnd(noop);
435 passesSingleStart(noop);
436 passesStrictEnd(noop);
437
438 it('emits the latest value when a notifier source emits', () => {
439 const { source: notifier$, next: notify } = sources.makeSubject();
440 const { source: input$, next } = sources.makeSubject();
441 const fn = vi.fn();
442
443 sinks.forEach(fn)(operators.sample(notifier$)(input$));
444
445 next(1);
446 next(2);
447 expect(fn).not.toHaveBeenCalled();
448
449 notify(null);
450 expect(fn).toHaveBeenCalledWith(2);
451 });
452});
453
454describe('scan', () => {
455 const noop = operators.scan<any, any>((_acc, x) => x, null);
456 passesPassivePull(noop);
457 passesActivePush(noop);
458 passesSinkClose(noop);
459 passesSourceEnd(noop);
460 passesSingleStart(noop);
461 passesAsyncSequence(noop);
462
463 it('folds values continuously with a reducer and initial value', () => {
464 const { source: input$, next } = sources.makeSubject<number>();
465 const fn = vi.fn();
466
467 const reducer = (acc: number, x: number) => acc + x;
468 sinks.forEach(fn)(operators.scan(reducer, 0)(input$));
469
470 next(1);
471 expect(fn).toHaveBeenCalledWith(1);
472 next(2);
473 expect(fn).toHaveBeenCalledWith(3);
474 });
475});
476
477describe('share', () => {
478 const noop = operators.share;
479 passesPassivePull(noop);
480 passesActivePush(noop);
481 passesSinkClose(noop);
482 passesSourceEnd(noop);
483 passesSingleStart(noop);
484 passesStrictEnd(noop);
485 passesAsyncSequence(noop);
486
487 it('shares output values between sinks', () => {
488 let onPush = () => {};
489
490 const source: Source<any> = operators.share(sink => {
491 sink(start(() => {}));
492 onPush = () => {
493 sink(push([0]));
494 sink(SignalKind.End);
495 };
496 });
497
498 const fnA = vi.fn();
499 const fnB = vi.fn();
500
501 sinks.forEach(fnA)(source);
502 sinks.forEach(fnB)(source);
503 onPush();
504
505 expect(fnA).toHaveBeenCalledWith([0]);
506 expect(fnB).toHaveBeenCalledWith([0]);
507 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
508 });
509
510 it('completes the source when no more sink is listening', () => {
511 let onPush = () => {};
512
513 const talkback = vi.fn();
514 const source: Source<any> = operators.share(sink => {
515 sink(start(talkback));
516 onPush = () => {
517 sink(push([0]));
518 sink(push([1]));
519 sink(SignalKind.End);
520 };
521 });
522
523 const fnA = vi.fn();
524 const fnB = vi.fn();
525
526 sinks.forEach(fnA)(operators.take(1)(source));
527 sinks.forEach(fnB)(operators.take(1)(source));
528 onPush();
529
530 expect(fnA).toHaveBeenCalledWith([0]);
531 expect(fnB).toHaveBeenCalledWith([0]);
532 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
533 expect(talkback).toHaveBeenCalledWith(TalkbackKind.Close);
534 });
535});
536
537describe('skip', () => {
538 const noop = operators.skip(0);
539 passesPassivePull(noop);
540 passesActivePush(noop);
541 passesSinkClose(noop);
542 passesSourceEnd(noop);
543 passesSingleStart(noop);
544 passesAsyncSequence(noop);
545
546 it('skips a number of values before emitting normally', () => {
547 const { source, next } = sources.makeSubject<number>();
548 const fn = vi.fn();
549
550 sinks.forEach(fn)(operators.skip(1)(source));
551
552 next(1);
553 expect(fn).not.toHaveBeenCalled();
554 next(2);
555 expect(fn).toHaveBeenCalledWith(2);
556 });
557});
558
559describe('skipUntil', () => {
560 const noop = operators.skipUntil(sources.fromValue(null));
561 passesPassivePull(noop);
562 passesActivePush(noop);
563 passesSinkClose(noop);
564 passesSourceEnd(noop);
565 passesSingleStart(noop);
566 passesAsyncSequence(noop);
567 passesStrictEnd(noop);
568
569 it('skips values until the notifier source emits', () => {
570 const { source: notifier$, next: notify } = sources.makeSubject();
571 const { source: input$, next } = sources.makeSubject<number>();
572 const fn = vi.fn();
573
574 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$));
575
576 next(1);
577 expect(fn).not.toHaveBeenCalled();
578 notify(null);
579 next(2);
580 expect(fn).toHaveBeenCalledWith(2);
581 });
582});
583
584describe('skipWhile', () => {
585 const noop = operators.skipWhile(() => false);
586 passesPassivePull(noop);
587 passesActivePush(noop);
588 passesSinkClose(noop);
589 passesSourceEnd(noop);
590 passesSingleStart(noop);
591 passesAsyncSequence(noop);
592
593 it('skips values until one fails a predicate', () => {
594 const { source, next } = sources.makeSubject<number>();
595 const fn = vi.fn();
596
597 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source));
598
599 next(1);
600 expect(fn).not.toHaveBeenCalled();
601 next(2);
602 expect(fn).toHaveBeenCalledWith(2);
603 });
604});
605
606describe('switchMap', () => {
607 const noop = operators.switchMap(x => sources.fromValue(x));
608 passesPassivePull(noop);
609 passesActivePush(noop);
610 passesSinkClose(noop);
611 passesSourcePushThenEnd(noop);
612 passesSingleStart(noop);
613 passesStrictEnd(noop);
614 passesAsyncSequence(noop);
615
616 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
617 it('emits values from each flattened synchronous source', () => {
618 const { source, next, complete } = sources.makeSubject<number>();
619 const fn = vi.fn();
620
621 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn);
622
623 next(1);
624 next(3);
625 complete();
626
627 expect(fn).toHaveBeenCalledTimes(6);
628 expect(fn.mock.calls).toEqual([
629 [start(expect.any(Function))],
630 [push(1)],
631 [push(2)],
632 [push(3)],
633 [push(4)],
634 [SignalKind.End],
635 ]);
636 });
637
638 // This synchronous test for switchMap will behave the same as concatMap & mergeMap
639 it('lets inner sources finish when outer source ends', () => {
640 const signals: Signal<any>[] = [];
641 const teardown = vi.fn();
642 const fn = (signal: Signal<any>) => {
643 signals.push(signal);
644 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
645 signal[0](TalkbackKind.Pull);
646 signal[0](TalkbackKind.Close);
647 }
648 };
649
650 operators.switchMap(() => {
651 return sources.make(() => teardown);
652 })(sources.fromValue(null))(fn);
653
654 expect(teardown).toHaveBeenCalled();
655 expect(signals).toEqual([start(expect.any(Function))]);
656 });
657
658 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap
659 it('emits values from each flattened asynchronous source, one at a time', () => {
660 const source = operators.delay<number>(4)(sources.fromArray([1, 10]));
661 const fn = vi.fn();
662
663 sinks.forEach(fn)(
664 operators.switchMap((x: number) =>
665 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5)))
666 )(source)
667 );
668
669 vi.runAllTimers();
670 expect(fn.mock.calls).toEqual([[1], [10], [20]]);
671 });
672});
673
674describe('take', () => {
675 const noop = operators.take(10);
676 passesPassivePull(noop);
677 passesActivePush(noop);
678 passesSinkClose(noop);
679 passesSourceEnd(noop);
680 passesSingleStart(noop);
681 passesStrictEnd(noop);
682 passesAsyncSequence(noop);
683
684 passesCloseAndEnd(operators.take(0));
685
686 it('emits values until a maximum is reached', () => {
687 const { source, next } = sources.makeSubject<number>();
688 const fn = vi.fn();
689
690 operators.take(1)(source)(fn);
691 next(1);
692
693 expect(fn).toHaveBeenCalledTimes(3);
694 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
695 });
696});
697
698describe('takeUntil', () => {
699 const noop = operators.takeUntil(sources.never);
700 passesPassivePull(noop);
701 passesActivePush(noop);
702 passesSinkClose(noop);
703 passesSourcePushThenEnd(noop);
704 passesSingleStart(noop);
705 passesStrictEnd(noop);
706 passesAsyncSequence(noop);
707
708 const ending = operators.takeUntil(sources.fromValue(null));
709 passesCloseAndEnd(ending);
710
711 it('emits values until a notifier emits', () => {
712 const { source: notifier$, next: notify } = sources.makeSubject<any>();
713 const { source: input$, next } = sources.makeSubject<number>();
714 const fn = vi.fn();
715
716 operators.takeUntil(notifier$)(input$)(fn);
717 next(1);
718
719 expect(fn).toHaveBeenCalledTimes(2);
720 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]);
721
722 notify(null);
723 expect(fn).toHaveBeenCalledTimes(3);
724 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End);
725 });
726
727 it('emits values until a notifier emits', () => {
728 const { source: input$, next } = sources.makeSubject<number>();
729 const fn = vi.fn();
730
731 let hasClosed = false;
732
733 operators.takeUntil(sink => {
734 sink(
735 start(talkback => {
736 if (talkback === TalkbackKind.Close) {
737 hasClosed = true;
738 } else if (talkback === TalkbackKind.Pull && !hasClosed) {
739 sink(push(1));
740 }
741 })
742 );
743 })(input$)(fn);
744
745 next(1);
746
747 expect(fn).toHaveBeenCalledTimes(2);
748 expect(fn.mock.calls).toEqual([[0], [start(expect.any(Function))]]);
749
750 expect(hasClosed).toBe(true);
751 });
752});
753
754describe('takeWhile', () => {
755 const noop = operators.takeWhile(() => true);
756 passesPassivePull(noop);
757 passesActivePush(noop);
758 passesSinkClose(noop);
759 passesSourceEnd(noop);
760 passesSingleStart(noop);
761 passesAsyncSequence(noop);
762
763 const ending = operators.takeWhile(() => false);
764 passesCloseAndEnd(ending);
765
766 it('emits values while a predicate passes for all values', () => {
767 const { source, next } = sources.makeSubject<number>();
768 const fn = vi.fn();
769
770 operators.takeWhile((x: any) => x < 2)(source)(fn);
771 next(1);
772 next(2);
773
774 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
775 });
776});
777
778describe('takeLast', () => {
779 passesCloseAndEnd(operators.takeLast(0));
780
781 it('emits the last max values of an ended source', () => {
782 const { source, next, complete } = sources.makeSubject<number>();
783 const signals: Signal<any>[] = [];
784
785 let talkback: TalkbackFn;
786
787 operators.takeLast(1)(source)(signal => {
788 signals.push(signal);
789 if (signal === SignalKind.End) {
790 /*noop*/
791 } else if (signal.tag === SignalKind.Start) {
792 (talkback = signal[0])(TalkbackKind.Pull);
793 } else {
794 talkback!(TalkbackKind.Pull);
795 }
796 });
797
798 next(1);
799 next(2);
800
801 expect(signals.length).toBe(0);
802 complete();
803
804 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]);
805 });
806});
807
808describe('throttle', () => {
809 const noop = operators.throttle(() => 0);
810 passesPassivePull(noop);
811 passesActivePush(noop);
812 passesSinkClose(noop);
813 passesSourceEnd(noop);
814 passesSingleStart(noop);
815 passesAsyncSequence(noop);
816
817 it('should ignore emissions for a period of time after a value', () => {
818 const { source, next } = sources.makeSubject<number>();
819 const fn = vi.fn();
820
821 sinks.forEach(fn)(operators.throttle(() => 100)(source));
822
823 next(1);
824 expect(fn).toHaveBeenCalledWith(1);
825 vi.advanceTimersByTime(50);
826
827 next(2);
828 expect(fn).toHaveBeenCalledTimes(1);
829 vi.advanceTimersByTime(50);
830
831 next(3);
832 expect(fn).toHaveBeenCalledWith(3);
833 });
834});