Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { describe, it, expect, beforeEach, vi } from 'vitest'; 2 3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 4import { push, start } from '../helpers'; 5 6import { 7 passesPassivePull, 8 passesActivePush, 9 passesSinkClose, 10 passesSourceEnd, 11 passesSingleStart, 12 passesStrictEnd, 13 passesSourcePushThenEnd, 14 passesAsyncSequence, 15 passesCloseAndEnd, 16} from './compliance'; 17 18import * as sources from '../sources'; 19import * as sinks from '../sinks'; 20import * as operators from '../operators'; 21 22beforeEach(() => { 23 vi.useFakeTimers(); 24}); 25 26describe('buffer', () => { 27 const valueThenNever: Source<any> = sink => 28 sink( 29 start(signal => { 30 if (signal === TalkbackKind.Pull) sink(push(null)); 31 }) 32 ); 33 34 const noop = operators.buffer(valueThenNever); 35 36 passesPassivePull(noop, [0]); 37 passesActivePush(noop, [0]); 38 passesSinkClose(noop); 39 passesSourcePushThenEnd(noop, [0]); 40 passesSingleStart(noop); 41 passesStrictEnd(noop); 42 43 it('emits batches of input values when a notifier emits', () => { 44 const { source: notifier$, next: notify } = sources.makeSubject(); 45 const { source: input$, next } = sources.makeSubject(); 46 const fn = vi.fn(); 47 48 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 49 50 next(1); 51 next(2); 52 expect(fn).not.toHaveBeenCalled(); 53 54 notify(null); 55 expect(fn).toHaveBeenCalledWith([1, 2]); 56 57 next(3); 58 notify(null); 59 expect(fn).toHaveBeenCalledWith([3]); 60 }); 61}); 62 63describe('concatMap', () => { 64 const noop = operators.concatMap(x => sources.fromValue(x)); 65 passesPassivePull(noop); 66 passesActivePush(noop); 67 passesSinkClose(noop); 68 passesSourcePushThenEnd(noop); 69 passesSingleStart(noop); 70 passesStrictEnd(noop); 71 passesAsyncSequence(noop); 72 73 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 74 it('emits values from each flattened synchronous source', () => { 75 const { source, next, complete } = sources.makeSubject<number>(); 76 const fn = vi.fn(); 77 78 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 79 80 next(1); 81 next(3); 82 complete(); 83 84 expect(fn).toHaveBeenCalledTimes(6); 85 expect(fn.mock.calls).toEqual([ 86 [start(expect.any(Function))], 87 [push(1)], 88 [push(2)], 89 [push(3)], 90 [push(4)], 91 [SignalKind.End], 92 ]); 93 }); 94 95 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 96 it('lets inner sources finish when outer source ends', () => { 97 const signals: Signal<any>[] = []; 98 const teardown = vi.fn(); 99 const fn = (signal: Signal<any>) => { 100 signals.push(signal); 101 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 102 signal[0](TalkbackKind.Pull); 103 signal[0](TalkbackKind.Close); 104 } 105 }; 106 107 operators.concatMap(() => { 108 return sources.make(() => teardown); 109 })(sources.fromValue(null))(fn); 110 111 expect(teardown).toHaveBeenCalled(); 112 expect(signals).toEqual([start(expect.any(Function))]); 113 }); 114 115 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 116 it('emits values from each flattened asynchronous source, one at a time', () => { 117 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 118 const fn = vi.fn(); 119 120 sinks.forEach(fn)( 121 operators.concatMap((x: number) => { 122 return operators.delay(5)(sources.fromArray([x, x * 2])); 123 })(source) 124 ); 125 126 vi.advanceTimersByTime(14); 127 expect(fn.mock.calls).toEqual([[1], [2]]); 128 129 vi.runAllTimers(); 130 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]); 131 }); 132 133 it('works for fully asynchronous sources', () => { 134 const fn = vi.fn(); 135 136 sinks.forEach(fn)( 137 operators.concatMap(() => { 138 return sources.make(observer => { 139 setTimeout(() => observer.next(1)); 140 return () => {}; 141 }); 142 })(sources.fromValue(null)) 143 ); 144 145 vi.runAllTimers(); 146 expect(fn).toHaveBeenCalledWith(1); 147 }); 148 149 it('emits synchronous values in order', () => { 150 const values: any[] = []; 151 152 sinks.forEach(x => values.push(x))( 153 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 154 ); 155 156 expect(values).toEqual([1, 2, 3, 4]); 157 }); 158}); 159 160describe('debounce', () => { 161 const noop = operators.debounce(() => 0); 162 passesPassivePull(noop); 163 passesActivePush(noop); 164 passesSinkClose(noop); 165 passesSourceEnd(noop); 166 passesSingleStart(noop); 167 passesStrictEnd(noop); 168 passesAsyncSequence(noop); 169 170 it('waits for a specified amount of silence before emitting the last value', () => { 171 const { source, next } = sources.makeSubject<number>(); 172 const fn = vi.fn(); 173 174 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 175 176 next(1); 177 vi.advanceTimersByTime(50); 178 expect(fn).not.toHaveBeenCalled(); 179 180 next(2); 181 vi.advanceTimersByTime(99); 182 expect(fn).not.toHaveBeenCalled(); 183 184 vi.advanceTimersByTime(1); 185 expect(fn).toHaveBeenCalledWith(2); 186 }); 187 188 it('emits debounced value with delayed End signal', () => { 189 const { source, next, complete } = sources.makeSubject<number>(); 190 const fn = vi.fn(); 191 192 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 193 194 next(1); 195 complete(); 196 vi.advanceTimersByTime(100); 197 expect(fn).toHaveBeenCalled(); 198 }); 199}); 200 201describe('delay', () => { 202 const noop = operators.delay(0); 203 passesPassivePull(noop); 204 passesActivePush(noop); 205 passesSinkClose(noop); 206 passesSourceEnd(noop); 207 passesSingleStart(noop); 208 passesAsyncSequence(noop); 209 210 it('delays outputs by a specified delay timeout value', () => { 211 const { source, next } = sources.makeSubject(); 212 const fn = vi.fn(); 213 214 sinks.forEach(fn)(operators.delay(100)(source)); 215 216 next(1); 217 expect(fn).not.toHaveBeenCalled(); 218 219 vi.advanceTimersByTime(100); 220 expect(fn).toHaveBeenCalledWith(1); 221 }); 222}); 223 224describe('filter', () => { 225 const noop = operators.filter(() => true); 226 passesPassivePull(noop); 227 passesActivePush(noop); 228 passesSinkClose(noop); 229 passesSourceEnd(noop); 230 passesSingleStart(noop); 231 passesAsyncSequence(noop); 232 233 it('prevents emissions for which a predicate fails', () => { 234 const { source, next } = sources.makeSubject(); 235 const fn = vi.fn(); 236 237 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 238 239 next(false); 240 expect(fn).not.toHaveBeenCalled(); 241 242 next(true); 243 expect(fn).toHaveBeenCalledWith(true); 244 }); 245}); 246 247describe('map', () => { 248 const noop = operators.map(x => x); 249 passesPassivePull(noop); 250 passesActivePush(noop); 251 passesSinkClose(noop); 252 passesSourceEnd(noop); 253 passesSingleStart(noop); 254 passesAsyncSequence(noop); 255 256 it('maps over values given a transform function', () => { 257 const { source, next } = sources.makeSubject<number>(); 258 const fn = vi.fn(); 259 260 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 261 262 next(1); 263 expect(fn).toHaveBeenCalledWith(2); 264 }); 265}); 266 267describe('mergeMap', () => { 268 const noop = operators.mergeMap(x => sources.fromValue(x)); 269 passesPassivePull(noop); 270 passesActivePush(noop); 271 passesSinkClose(noop); 272 passesSourcePushThenEnd(noop); 273 passesSingleStart(noop); 274 passesStrictEnd(noop); 275 passesAsyncSequence(noop); 276 277 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 278 it('emits values from each flattened synchronous source', () => { 279 const { source, next, complete } = sources.makeSubject<number>(); 280 const fn = vi.fn(); 281 282 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 283 284 next(1); 285 next(3); 286 complete(); 287 288 expect(fn.mock.calls).toEqual([ 289 [start(expect.any(Function))], 290 [push(1)], 291 [push(2)], 292 [push(3)], 293 [push(4)], 294 [SignalKind.End], 295 ]); 296 }); 297 298 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 299 it('lets inner sources finish when outer source ends', () => { 300 const values: Signal<any>[] = []; 301 const teardown = vi.fn(); 302 const fn = (signal: Signal<any>) => { 303 values.push(signal); 304 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 305 signal[0](TalkbackKind.Pull); 306 signal[0](TalkbackKind.Close); 307 } 308 }; 309 310 operators.mergeMap(() => { 311 return sources.make(() => teardown); 312 })(sources.fromValue(null))(fn); 313 314 expect(teardown).toHaveBeenCalled(); 315 expect(values).toEqual([start(expect.any(Function))]); 316 }); 317 318 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 319 it('emits values from each flattened asynchronous source simultaneously', () => { 320 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 321 const fn = vi.fn(); 322 323 sinks.forEach(fn)( 324 operators.mergeMap((x: number) => { 325 return operators.delay(5)(sources.fromArray([x, x * 2])); 326 })(source) 327 ); 328 329 vi.runAllTimers(); 330 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]); 331 }); 332 333 it('emits synchronous values in order', () => { 334 const values: any[] = []; 335 336 sinks.forEach(x => values.push(x))( 337 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 338 ); 339 340 expect(values).toEqual([1, 2, 3, 4]); 341 }); 342}); 343 344describe('onEnd', () => { 345 const noop = operators.onEnd(() => {}); 346 passesPassivePull(noop); 347 passesActivePush(noop); 348 passesSinkClose(noop); 349 passesSourceEnd(noop); 350 passesStrictEnd(noop); 351 passesSingleStart(noop); 352 passesAsyncSequence(noop); 353 354 it('calls a callback when the source ends', () => { 355 const { source, next, complete } = sources.makeSubject<any>(); 356 const fn = vi.fn(); 357 358 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 359 360 next(null); 361 expect(fn).not.toHaveBeenCalled(); 362 363 complete(); 364 expect(fn).toHaveBeenCalled(); 365 }); 366}); 367 368describe('onPush', () => { 369 const noop = operators.onPush(() => {}); 370 passesPassivePull(noop); 371 passesActivePush(noop); 372 passesSinkClose(noop); 373 passesSourceEnd(noop); 374 passesStrictEnd(noop); 375 passesSingleStart(noop); 376 passesAsyncSequence(noop); 377 378 it('calls a callback when the source emits', () => { 379 const { source, next } = sources.makeSubject<number>(); 380 const fn = vi.fn(); 381 382 sinks.forEach(() => {})(operators.onPush(fn)(source)); 383 384 next(1); 385 expect(fn).toHaveBeenCalledWith(1); 386 next(2); 387 expect(fn).toHaveBeenCalledWith(2); 388 }); 389 390 it('is the same as `tap`', () => { 391 expect(operators.onPush).toBe(operators.tap); 392 }); 393}); 394 395describe('onStart', () => { 396 const noop = operators.onStart(() => {}); 397 passesPassivePull(noop); 398 passesActivePush(noop); 399 passesSinkClose(noop); 400 passesSourceEnd(noop); 401 passesSingleStart(noop); 402 passesAsyncSequence(noop); 403 404 it('is called when the source starts', () => { 405 let sink: Sink<any>; 406 407 const fn = vi.fn(); 408 const source: Source<any> = _sink => { 409 sink = _sink; 410 }; 411 412 sinks.forEach(() => {})(operators.onStart(fn)(source)); 413 414 expect(fn).not.toHaveBeenCalled(); 415 416 sink!(start(() => {})); 417 expect(fn).toHaveBeenCalled(); 418 }); 419}); 420 421describe('sample', () => { 422 const valueThenNever: Source<any> = sink => 423 sink( 424 start(signal => { 425 if (signal === TalkbackKind.Pull) sink(push(null)); 426 }) 427 ); 428 429 const noop = operators.sample(valueThenNever); 430 431 passesPassivePull(noop); 432 passesActivePush(noop); 433 passesSinkClose(noop); 434 passesSourcePushThenEnd(noop); 435 passesSingleStart(noop); 436 passesStrictEnd(noop); 437 438 it('emits the latest value when a notifier source emits', () => { 439 const { source: notifier$, next: notify } = sources.makeSubject(); 440 const { source: input$, next } = sources.makeSubject(); 441 const fn = vi.fn(); 442 443 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 444 445 next(1); 446 next(2); 447 expect(fn).not.toHaveBeenCalled(); 448 449 notify(null); 450 expect(fn).toHaveBeenCalledWith(2); 451 }); 452}); 453 454describe('scan', () => { 455 const noop = operators.scan<any, any>((_acc, x) => x, null); 456 passesPassivePull(noop); 457 passesActivePush(noop); 458 passesSinkClose(noop); 459 passesSourceEnd(noop); 460 passesSingleStart(noop); 461 passesAsyncSequence(noop); 462 463 it('folds values continuously with a reducer and initial value', () => { 464 const { source: input$, next } = sources.makeSubject<number>(); 465 const fn = vi.fn(); 466 467 const reducer = (acc: number, x: number) => acc + x; 468 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 469 470 next(1); 471 expect(fn).toHaveBeenCalledWith(1); 472 next(2); 473 expect(fn).toHaveBeenCalledWith(3); 474 }); 475}); 476 477describe('share', () => { 478 const noop = operators.share; 479 passesPassivePull(noop); 480 passesActivePush(noop); 481 passesSinkClose(noop); 482 passesSourceEnd(noop); 483 passesSingleStart(noop); 484 passesStrictEnd(noop); 485 passesAsyncSequence(noop); 486 487 it('shares output values between sinks', () => { 488 let onPush = () => {}; 489 490 const source: Source<any> = operators.share(sink => { 491 sink(start(() => {})); 492 onPush = () => { 493 sink(push([0])); 494 sink(SignalKind.End); 495 }; 496 }); 497 498 const fnA = vi.fn(); 499 const fnB = vi.fn(); 500 501 sinks.forEach(fnA)(source); 502 sinks.forEach(fnB)(source); 503 onPush(); 504 505 expect(fnA).toHaveBeenCalledWith([0]); 506 expect(fnB).toHaveBeenCalledWith([0]); 507 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 508 }); 509}); 510 511describe('skip', () => { 512 const noop = operators.skip(0); 513 passesPassivePull(noop); 514 passesActivePush(noop); 515 passesSinkClose(noop); 516 passesSourceEnd(noop); 517 passesSingleStart(noop); 518 passesAsyncSequence(noop); 519 520 it('skips a number of values before emitting normally', () => { 521 const { source, next } = sources.makeSubject<number>(); 522 const fn = vi.fn(); 523 524 sinks.forEach(fn)(operators.skip(1)(source)); 525 526 next(1); 527 expect(fn).not.toHaveBeenCalled(); 528 next(2); 529 expect(fn).toHaveBeenCalledWith(2); 530 }); 531}); 532 533describe('skipUntil', () => { 534 const noop = operators.skipUntil(sources.fromValue(null)); 535 passesPassivePull(noop); 536 passesActivePush(noop); 537 passesSinkClose(noop); 538 passesSourceEnd(noop); 539 passesSingleStart(noop); 540 passesAsyncSequence(noop); 541 passesStrictEnd(noop); 542 543 it('skips values until the notifier source emits', () => { 544 const { source: notifier$, next: notify } = sources.makeSubject(); 545 const { source: input$, next } = sources.makeSubject<number>(); 546 const fn = vi.fn(); 547 548 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 549 550 next(1); 551 expect(fn).not.toHaveBeenCalled(); 552 notify(null); 553 next(2); 554 expect(fn).toHaveBeenCalledWith(2); 555 }); 556}); 557 558describe('skipWhile', () => { 559 const noop = operators.skipWhile(() => false); 560 passesPassivePull(noop); 561 passesActivePush(noop); 562 passesSinkClose(noop); 563 passesSourceEnd(noop); 564 passesSingleStart(noop); 565 passesAsyncSequence(noop); 566 567 it('skips values until one fails a predicate', () => { 568 const { source, next } = sources.makeSubject<number>(); 569 const fn = vi.fn(); 570 571 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source)); 572 573 next(1); 574 expect(fn).not.toHaveBeenCalled(); 575 next(2); 576 expect(fn).toHaveBeenCalledWith(2); 577 }); 578}); 579 580describe('switchMap', () => { 581 const noop = operators.switchMap(x => sources.fromValue(x)); 582 passesPassivePull(noop); 583 passesActivePush(noop); 584 passesSinkClose(noop); 585 passesSourcePushThenEnd(noop); 586 passesSingleStart(noop); 587 passesStrictEnd(noop); 588 passesAsyncSequence(noop); 589 590 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 591 it('emits values from each flattened synchronous source', () => { 592 const { source, next, complete } = sources.makeSubject<number>(); 593 const fn = vi.fn(); 594 595 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 596 597 next(1); 598 next(3); 599 complete(); 600 601 expect(fn).toHaveBeenCalledTimes(6); 602 expect(fn.mock.calls).toEqual([ 603 [start(expect.any(Function))], 604 [push(1)], 605 [push(2)], 606 [push(3)], 607 [push(4)], 608 [SignalKind.End], 609 ]); 610 }); 611 612 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 613 it('lets inner sources finish when outer source ends', () => { 614 const signals: Signal<any>[] = []; 615 const teardown = vi.fn(); 616 const fn = (signal: Signal<any>) => { 617 signals.push(signal); 618 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 619 signal[0](TalkbackKind.Pull); 620 signal[0](TalkbackKind.Close); 621 } 622 }; 623 624 operators.switchMap(() => { 625 return sources.make(() => teardown); 626 })(sources.fromValue(null))(fn); 627 628 expect(teardown).toHaveBeenCalled(); 629 expect(signals).toEqual([start(expect.any(Function))]); 630 }); 631 632 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 633 it('emits values from each flattened asynchronous source, one at a time', () => { 634 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 635 const fn = vi.fn(); 636 637 sinks.forEach(fn)( 638 operators.switchMap((x: number) => 639 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5))) 640 )(source) 641 ); 642 643 vi.runAllTimers(); 644 expect(fn.mock.calls).toEqual([[1], [10], [20]]); 645 }); 646}); 647 648describe('take', () => { 649 const noop = operators.take(10); 650 passesPassivePull(noop); 651 passesActivePush(noop); 652 passesSinkClose(noop); 653 passesSourceEnd(noop); 654 passesSingleStart(noop); 655 passesStrictEnd(noop); 656 passesAsyncSequence(noop); 657 658 passesCloseAndEnd(operators.take(0)); 659 660 it('emits values until a maximum is reached', () => { 661 const { source, next } = sources.makeSubject<number>(); 662 const fn = vi.fn(); 663 664 operators.take(1)(source)(fn); 665 next(1); 666 667 expect(fn).toHaveBeenCalledTimes(3); 668 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 669 }); 670}); 671 672describe('takeUntil', () => { 673 const noop = operators.takeUntil(sources.never); 674 passesPassivePull(noop); 675 passesActivePush(noop); 676 passesSinkClose(noop); 677 passesSourcePushThenEnd(noop); 678 passesSingleStart(noop); 679 passesStrictEnd(noop); 680 passesAsyncSequence(noop); 681 682 const ending = operators.takeUntil(sources.fromValue(null)); 683 passesCloseAndEnd(ending); 684 685 it('emits values until a notifier emits', () => { 686 const { source: notifier$, next: notify } = sources.makeSubject<any>(); 687 const { source: input$, next } = sources.makeSubject<number>(); 688 const fn = vi.fn(); 689 690 operators.takeUntil(notifier$)(input$)(fn); 691 next(1); 692 693 expect(fn).toHaveBeenCalledTimes(2); 694 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]); 695 696 notify(null); 697 expect(fn).toHaveBeenCalledTimes(3); 698 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End); 699 }); 700}); 701 702describe('takeWhile', () => { 703 const noop = operators.takeWhile(() => true); 704 passesPassivePull(noop); 705 passesActivePush(noop); 706 passesSinkClose(noop); 707 passesSourceEnd(noop); 708 passesSingleStart(noop); 709 passesAsyncSequence(noop); 710 711 const ending = operators.takeWhile(() => false); 712 passesCloseAndEnd(ending); 713 714 it('emits values while a predicate passes for all values', () => { 715 const { source, next } = sources.makeSubject<number>(); 716 const fn = vi.fn(); 717 718 operators.takeWhile((x: any) => x < 2)(source)(fn); 719 next(1); 720 next(2); 721 722 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 723 }); 724}); 725 726describe('takeLast', () => { 727 passesCloseAndEnd(operators.takeLast(0)); 728 729 it('emits the last max values of an ended source', () => { 730 const { source, next, complete } = sources.makeSubject<number>(); 731 const signals: Signal<any>[] = []; 732 733 let talkback: TalkbackFn; 734 735 operators.takeLast(1)(source)(signal => { 736 signals.push(signal); 737 if (signal === SignalKind.End) { 738 /*noop*/ 739 } else if (signal.tag === SignalKind.Start) { 740 (talkback = signal[0])(TalkbackKind.Pull); 741 } else { 742 talkback!(TalkbackKind.Pull); 743 } 744 }); 745 746 next(1); 747 next(2); 748 749 expect(signals.length).toBe(0); 750 complete(); 751 752 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]); 753 }); 754}); 755 756describe('throttle', () => { 757 const noop = operators.throttle(() => 0); 758 passesPassivePull(noop); 759 passesActivePush(noop); 760 passesSinkClose(noop); 761 passesSourceEnd(noop); 762 passesSingleStart(noop); 763 passesAsyncSequence(noop); 764 765 it('should ignore emissions for a period of time after a value', () => { 766 const { source, next } = sources.makeSubject<number>(); 767 const fn = vi.fn(); 768 769 sinks.forEach(fn)(operators.throttle(() => 100)(source)); 770 771 next(1); 772 expect(fn).toHaveBeenCalledWith(1); 773 vi.advanceTimersByTime(50); 774 775 next(2); 776 expect(fn).toHaveBeenCalledTimes(1); 777 vi.advanceTimersByTime(50); 778 779 next(3); 780 expect(fn).toHaveBeenCalledWith(3); 781 }); 782});