Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.2.0 22 kB view raw
1import { describe, it, expect, beforeEach, vi } from 'vitest'; 2 3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 4import { push, start } from '../helpers'; 5 6import { 7 passesPassivePull, 8 passesActivePush, 9 passesSinkClose, 10 passesSourceEnd, 11 passesSingleStart, 12 passesStrictEnd, 13 passesSourcePushThenEnd, 14 passesAsyncSequence, 15 passesCloseAndEnd, 16} from './compliance'; 17 18import * as sources from '../sources'; 19import * as sinks from '../sinks'; 20import * as operators from '../operators'; 21 22beforeEach(() => { 23 vi.useFakeTimers(); 24}); 25 26describe('buffer', () => { 27 const valueThenNever: Source<any> = sink => 28 sink( 29 start(signal => { 30 if (signal === TalkbackKind.Pull) sink(push(null)); 31 }) 32 ); 33 34 const noop = operators.buffer(valueThenNever); 35 36 passesPassivePull(noop, [0]); 37 passesActivePush(noop, [0]); 38 passesSinkClose(noop); 39 passesSourcePushThenEnd(noop, [0]); 40 passesSingleStart(noop); 41 passesStrictEnd(noop); 42 43 it('emits batches of input values when a notifier emits', () => { 44 const { source: notifier$, next: notify } = sources.makeSubject(); 45 const { source: input$, next } = sources.makeSubject(); 46 const fn = vi.fn(); 47 48 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 49 50 next(1); 51 next(2); 52 expect(fn).not.toHaveBeenCalled(); 53 54 notify(null); 55 expect(fn).toHaveBeenCalledWith([1, 2]); 56 57 next(3); 58 notify(null); 59 expect(fn).toHaveBeenCalledWith([3]); 60 }); 61}); 62 63describe('concatMap', () => { 64 const noop = operators.concatMap(x => sources.fromValue(x)); 65 passesPassivePull(noop); 66 passesActivePush(noop); 67 passesSinkClose(noop); 68 passesSourcePushThenEnd(noop); 69 passesSingleStart(noop); 70 passesStrictEnd(noop); 71 passesAsyncSequence(noop); 72 73 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 74 it('emits values from each flattened synchronous source', () => { 75 const { source, next, complete } = sources.makeSubject<number>(); 76 const fn = vi.fn(); 77 78 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 79 80 next(1); 81 next(3); 82 complete(); 83 84 expect(fn).toHaveBeenCalledTimes(6); 85 expect(fn.mock.calls).toEqual([ 86 [start(expect.any(Function))], 87 [push(1)], 88 [push(2)], 89 [push(3)], 90 [push(4)], 91 [SignalKind.End], 92 ]); 93 }); 94 95 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 96 it('lets inner sources finish when outer source ends', () => { 97 const signals: Signal<any>[] = []; 98 const teardown = vi.fn(); 99 const fn = (signal: Signal<any>) => { 100 signals.push(signal); 101 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 102 signal[0](TalkbackKind.Pull); 103 signal[0](TalkbackKind.Close); 104 } 105 }; 106 107 operators.concatMap(() => { 108 return sources.make(() => teardown); 109 })(sources.fromValue(null))(fn); 110 111 expect(teardown).toHaveBeenCalled(); 112 expect(signals).toEqual([start(expect.any(Function))]); 113 }); 114 115 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 116 it('emits values from each flattened asynchronous source, one at a time', () => { 117 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 118 const fn = vi.fn(); 119 120 sinks.forEach(fn)( 121 operators.concatMap((x: number) => { 122 return operators.delay(5)(sources.fromArray([x, x * 2])); 123 })(source) 124 ); 125 126 vi.advanceTimersByTime(14); 127 expect(fn.mock.calls).toEqual([[1], [2]]); 128 129 vi.runAllTimers(); 130 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]); 131 }); 132 133 it('works for fully asynchronous sources', () => { 134 const fn = vi.fn(); 135 136 sinks.forEach(fn)( 137 operators.concatMap(() => { 138 return sources.make(observer => { 139 setTimeout(() => observer.next(1)); 140 return () => {}; 141 }); 142 })(sources.fromValue(null)) 143 ); 144 145 vi.runAllTimers(); 146 expect(fn).toHaveBeenCalledWith(1); 147 }); 148 149 it('emits synchronous values in order', () => { 150 const values: any[] = []; 151 152 sinks.forEach(x => values.push(x))( 153 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 154 ); 155 156 expect(values).toEqual([1, 2, 3, 4]); 157 }); 158}); 159 160describe('debounce', () => { 161 const noop = operators.debounce(() => 0); 162 passesPassivePull(noop); 163 passesActivePush(noop); 164 passesSinkClose(noop); 165 passesSourceEnd(noop); 166 passesSingleStart(noop); 167 passesStrictEnd(noop); 168 passesAsyncSequence(noop); 169 170 it('waits for a specified amount of silence before emitting the last value', () => { 171 const { source, next } = sources.makeSubject<number>(); 172 const fn = vi.fn(); 173 174 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 175 176 next(1); 177 vi.advanceTimersByTime(50); 178 expect(fn).not.toHaveBeenCalled(); 179 180 next(2); 181 vi.advanceTimersByTime(99); 182 expect(fn).not.toHaveBeenCalled(); 183 184 vi.advanceTimersByTime(1); 185 expect(fn).toHaveBeenCalledWith(2); 186 }); 187 188 it('emits debounced value with delayed End signal', () => { 189 const { source, next, complete } = sources.makeSubject<number>(); 190 const fn = vi.fn(); 191 192 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 193 194 next(1); 195 complete(); 196 vi.advanceTimersByTime(100); 197 expect(fn).toHaveBeenCalled(); 198 }); 199}); 200 201describe('delay', () => { 202 const noop = operators.delay(0); 203 passesPassivePull(noop); 204 passesActivePush(noop); 205 passesSinkClose(noop); 206 passesSourceEnd(noop); 207 passesSingleStart(noop); 208 passesAsyncSequence(noop); 209 210 it('delays outputs by a specified delay timeout value', () => { 211 const { source, next } = sources.makeSubject(); 212 const fn = vi.fn(); 213 214 sinks.forEach(fn)(operators.delay(100)(source)); 215 216 next(1); 217 expect(fn).not.toHaveBeenCalled(); 218 219 vi.advanceTimersByTime(100); 220 expect(fn).toHaveBeenCalledWith(1); 221 }); 222}); 223 224describe('filter', () => { 225 const noop = operators.filter(() => true); 226 passesPassivePull(noop); 227 passesActivePush(noop); 228 passesSinkClose(noop); 229 passesSourceEnd(noop); 230 passesSingleStart(noop); 231 passesAsyncSequence(noop); 232 233 it('prevents emissions for which a predicate fails', () => { 234 const { source, next } = sources.makeSubject(); 235 const fn = vi.fn(); 236 237 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 238 239 next(false); 240 expect(fn).not.toHaveBeenCalled(); 241 242 next(true); 243 expect(fn).toHaveBeenCalledWith(true); 244 }); 245}); 246 247describe('map', () => { 248 const noop = operators.map(x => x); 249 passesPassivePull(noop); 250 passesActivePush(noop); 251 passesSinkClose(noop); 252 passesSourceEnd(noop); 253 passesSingleStart(noop); 254 passesAsyncSequence(noop); 255 256 it('maps over values given a transform function', () => { 257 const { source, next } = sources.makeSubject<number>(); 258 const fn = vi.fn(); 259 260 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 261 262 next(1); 263 expect(fn).toHaveBeenCalledWith(2); 264 }); 265}); 266 267describe('mergeMap', () => { 268 const noop = operators.mergeMap(x => sources.fromValue(x)); 269 passesPassivePull(noop); 270 passesActivePush(noop); 271 passesSinkClose(noop); 272 passesSourcePushThenEnd(noop); 273 passesSingleStart(noop); 274 passesStrictEnd(noop); 275 passesAsyncSequence(noop); 276 277 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 278 it('emits values from each flattened synchronous source', () => { 279 const { source, next, complete } = sources.makeSubject<number>(); 280 const fn = vi.fn(); 281 282 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 283 284 next(1); 285 next(3); 286 complete(); 287 288 expect(fn.mock.calls).toEqual([ 289 [start(expect.any(Function))], 290 [push(1)], 291 [push(2)], 292 [push(3)], 293 [push(4)], 294 [SignalKind.End], 295 ]); 296 }); 297 298 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 299 it('lets inner sources finish when outer source ends', () => { 300 const values: Signal<any>[] = []; 301 const teardown = vi.fn(); 302 const fn = (signal: Signal<any>) => { 303 values.push(signal); 304 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 305 signal[0](TalkbackKind.Pull); 306 signal[0](TalkbackKind.Close); 307 } 308 }; 309 310 operators.mergeMap(() => { 311 return sources.make(() => teardown); 312 })(sources.fromValue(null))(fn); 313 314 expect(teardown).toHaveBeenCalled(); 315 expect(values).toEqual([start(expect.any(Function))]); 316 }); 317 318 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 319 it('emits values from each flattened asynchronous source simultaneously', () => { 320 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 321 const fn = vi.fn(); 322 323 sinks.forEach(fn)( 324 operators.mergeMap((x: number) => { 325 return operators.delay(5)(sources.fromArray([x, x * 2])); 326 })(source) 327 ); 328 329 vi.runAllTimers(); 330 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]); 331 }); 332 333 it('emits synchronous values in order', () => { 334 const values: any[] = []; 335 336 sinks.forEach(x => values.push(x))( 337 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 338 ); 339 340 expect(values).toEqual([1, 2, 3, 4]); 341 }); 342}); 343 344describe('onEnd', () => { 345 const noop = operators.onEnd(() => {}); 346 passesPassivePull(noop); 347 passesActivePush(noop); 348 passesSinkClose(noop); 349 passesSourceEnd(noop); 350 passesStrictEnd(noop); 351 passesSingleStart(noop); 352 passesAsyncSequence(noop); 353 354 it('calls a callback when the source ends', () => { 355 const { source, next, complete } = sources.makeSubject<any>(); 356 const fn = vi.fn(); 357 358 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 359 360 next(null); 361 expect(fn).not.toHaveBeenCalled(); 362 363 complete(); 364 expect(fn).toHaveBeenCalled(); 365 }); 366}); 367 368describe('onPush', () => { 369 const noop = operators.onPush(() => {}); 370 passesPassivePull(noop); 371 passesActivePush(noop); 372 passesSinkClose(noop); 373 passesSourceEnd(noop); 374 passesStrictEnd(noop); 375 passesSingleStart(noop); 376 passesAsyncSequence(noop); 377 378 it('calls a callback when the source emits', () => { 379 const { source, next } = sources.makeSubject<number>(); 380 const fn = vi.fn(); 381 382 sinks.forEach(() => {})(operators.onPush(fn)(source)); 383 384 next(1); 385 expect(fn).toHaveBeenCalledWith(1); 386 next(2); 387 expect(fn).toHaveBeenCalledWith(2); 388 }); 389 390 it('is the same as `tap`', () => { 391 expect(operators.onPush).toBe(operators.tap); 392 }); 393}); 394 395describe('onStart', () => { 396 const noop = operators.onStart(() => {}); 397 passesPassivePull(noop); 398 passesActivePush(noop); 399 passesSinkClose(noop); 400 passesSourceEnd(noop); 401 passesSingleStart(noop); 402 passesAsyncSequence(noop); 403 404 it('is called when the source starts', () => { 405 let sink: Sink<any>; 406 407 const fn = vi.fn(); 408 const source: Source<any> = _sink => { 409 sink = _sink; 410 }; 411 412 sinks.forEach(() => {})(operators.onStart(fn)(source)); 413 414 expect(fn).not.toHaveBeenCalled(); 415 416 sink!(start(() => {})); 417 expect(fn).toHaveBeenCalled(); 418 }); 419}); 420 421describe('sample', () => { 422 const valueThenNever: Source<any> = sink => 423 sink( 424 start(signal => { 425 if (signal === TalkbackKind.Pull) sink(push(null)); 426 }) 427 ); 428 429 const noop = operators.sample(valueThenNever); 430 431 passesPassivePull(noop); 432 passesActivePush(noop); 433 passesSinkClose(noop); 434 passesSourcePushThenEnd(noop); 435 passesSingleStart(noop); 436 passesStrictEnd(noop); 437 438 it('emits the latest value when a notifier source emits', () => { 439 const { source: notifier$, next: notify } = sources.makeSubject(); 440 const { source: input$, next } = sources.makeSubject(); 441 const fn = vi.fn(); 442 443 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 444 445 next(1); 446 next(2); 447 expect(fn).not.toHaveBeenCalled(); 448 449 notify(null); 450 expect(fn).toHaveBeenCalledWith(2); 451 }); 452}); 453 454describe('scan', () => { 455 const noop = operators.scan<any, any>((_acc, x) => x, null); 456 passesPassivePull(noop); 457 passesActivePush(noop); 458 passesSinkClose(noop); 459 passesSourceEnd(noop); 460 passesSingleStart(noop); 461 passesAsyncSequence(noop); 462 463 it('folds values continuously with a reducer and initial value', () => { 464 const { source: input$, next } = sources.makeSubject<number>(); 465 const fn = vi.fn(); 466 467 const reducer = (acc: number, x: number) => acc + x; 468 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 469 470 next(1); 471 expect(fn).toHaveBeenCalledWith(1); 472 next(2); 473 expect(fn).toHaveBeenCalledWith(3); 474 }); 475}); 476 477describe('share', () => { 478 const noop = operators.share; 479 passesPassivePull(noop); 480 passesActivePush(noop); 481 passesSinkClose(noop); 482 passesSourceEnd(noop); 483 passesSingleStart(noop); 484 passesStrictEnd(noop); 485 passesAsyncSequence(noop); 486 487 it('shares output values between sinks', () => { 488 let onPush = () => {}; 489 490 const source: Source<any> = operators.share(sink => { 491 sink(start(() => {})); 492 onPush = () => { 493 sink(push([0])); 494 sink(SignalKind.End); 495 }; 496 }); 497 498 const fnA = vi.fn(); 499 const fnB = vi.fn(); 500 501 sinks.forEach(fnA)(source); 502 sinks.forEach(fnB)(source); 503 onPush(); 504 505 expect(fnA).toHaveBeenCalledWith([0]); 506 expect(fnB).toHaveBeenCalledWith([0]); 507 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 508 }); 509 510 it('completes the source when no more sink is listening', () => { 511 let onPush = () => {}; 512 513 const talkback = vi.fn(); 514 const source: Source<any> = operators.share(sink => { 515 sink(start(talkback)); 516 onPush = () => { 517 sink(push([0])); 518 sink(push([1])); 519 sink(SignalKind.End); 520 }; 521 }); 522 523 const fnA = vi.fn(); 524 const fnB = vi.fn(); 525 526 sinks.forEach(fnA)(operators.take(1)(source)); 527 sinks.forEach(fnB)(operators.take(1)(source)); 528 onPush(); 529 530 expect(fnA).toHaveBeenCalledWith([0]); 531 expect(fnB).toHaveBeenCalledWith([0]); 532 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 533 expect(talkback).toHaveBeenCalledWith(TalkbackKind.Close); 534 }); 535}); 536 537describe('skip', () => { 538 const noop = operators.skip(0); 539 passesPassivePull(noop); 540 passesActivePush(noop); 541 passesSinkClose(noop); 542 passesSourceEnd(noop); 543 passesSingleStart(noop); 544 passesAsyncSequence(noop); 545 546 it('skips a number of values before emitting normally', () => { 547 const { source, next } = sources.makeSubject<number>(); 548 const fn = vi.fn(); 549 550 sinks.forEach(fn)(operators.skip(1)(source)); 551 552 next(1); 553 expect(fn).not.toHaveBeenCalled(); 554 next(2); 555 expect(fn).toHaveBeenCalledWith(2); 556 }); 557}); 558 559describe('skipUntil', () => { 560 const noop = operators.skipUntil(sources.fromValue(null)); 561 passesPassivePull(noop); 562 passesActivePush(noop); 563 passesSinkClose(noop); 564 passesSourceEnd(noop); 565 passesSingleStart(noop); 566 passesAsyncSequence(noop); 567 passesStrictEnd(noop); 568 569 it('skips values until the notifier source emits', () => { 570 const { source: notifier$, next: notify } = sources.makeSubject(); 571 const { source: input$, next } = sources.makeSubject<number>(); 572 const fn = vi.fn(); 573 574 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 575 576 next(1); 577 expect(fn).not.toHaveBeenCalled(); 578 notify(null); 579 next(2); 580 expect(fn).toHaveBeenCalledWith(2); 581 }); 582}); 583 584describe('skipWhile', () => { 585 const noop = operators.skipWhile(() => false); 586 passesPassivePull(noop); 587 passesActivePush(noop); 588 passesSinkClose(noop); 589 passesSourceEnd(noop); 590 passesSingleStart(noop); 591 passesAsyncSequence(noop); 592 593 it('skips values until one fails a predicate', () => { 594 const { source, next } = sources.makeSubject<number>(); 595 const fn = vi.fn(); 596 597 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source)); 598 599 next(1); 600 expect(fn).not.toHaveBeenCalled(); 601 next(2); 602 expect(fn).toHaveBeenCalledWith(2); 603 }); 604}); 605 606describe('switchMap', () => { 607 const noop = operators.switchMap(x => sources.fromValue(x)); 608 passesPassivePull(noop); 609 passesActivePush(noop); 610 passesSinkClose(noop); 611 passesSourcePushThenEnd(noop); 612 passesSingleStart(noop); 613 passesStrictEnd(noop); 614 passesAsyncSequence(noop); 615 616 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 617 it('emits values from each flattened synchronous source', () => { 618 const { source, next, complete } = sources.makeSubject<number>(); 619 const fn = vi.fn(); 620 621 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 622 623 next(1); 624 next(3); 625 complete(); 626 627 expect(fn).toHaveBeenCalledTimes(6); 628 expect(fn.mock.calls).toEqual([ 629 [start(expect.any(Function))], 630 [push(1)], 631 [push(2)], 632 [push(3)], 633 [push(4)], 634 [SignalKind.End], 635 ]); 636 }); 637 638 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 639 it('lets inner sources finish when outer source ends', () => { 640 const signals: Signal<any>[] = []; 641 const teardown = vi.fn(); 642 const fn = (signal: Signal<any>) => { 643 signals.push(signal); 644 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 645 signal[0](TalkbackKind.Pull); 646 signal[0](TalkbackKind.Close); 647 } 648 }; 649 650 operators.switchMap(() => { 651 return sources.make(() => teardown); 652 })(sources.fromValue(null))(fn); 653 654 expect(teardown).toHaveBeenCalled(); 655 expect(signals).toEqual([start(expect.any(Function))]); 656 }); 657 658 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 659 it('emits values from each flattened asynchronous source, one at a time', () => { 660 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 661 const fn = vi.fn(); 662 663 sinks.forEach(fn)( 664 operators.switchMap((x: number) => 665 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5))) 666 )(source) 667 ); 668 669 vi.runAllTimers(); 670 expect(fn.mock.calls).toEqual([[1], [10], [20]]); 671 }); 672}); 673 674describe('take', () => { 675 const noop = operators.take(10); 676 passesPassivePull(noop); 677 passesActivePush(noop); 678 passesSinkClose(noop); 679 passesSourceEnd(noop); 680 passesSingleStart(noop); 681 passesStrictEnd(noop); 682 passesAsyncSequence(noop); 683 684 passesCloseAndEnd(operators.take(0)); 685 686 it('emits values until a maximum is reached', () => { 687 const { source, next } = sources.makeSubject<number>(); 688 const fn = vi.fn(); 689 690 operators.take(1)(source)(fn); 691 next(1); 692 693 expect(fn).toHaveBeenCalledTimes(3); 694 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 695 }); 696}); 697 698describe('takeUntil', () => { 699 const noop = operators.takeUntil(sources.never); 700 passesPassivePull(noop); 701 passesActivePush(noop); 702 passesSinkClose(noop); 703 passesSourcePushThenEnd(noop); 704 passesSingleStart(noop); 705 passesStrictEnd(noop); 706 passesAsyncSequence(noop); 707 708 const ending = operators.takeUntil(sources.fromValue(null)); 709 passesCloseAndEnd(ending); 710 711 it('emits values until a notifier emits', () => { 712 const { source: notifier$, next: notify } = sources.makeSubject<any>(); 713 const { source: input$, next } = sources.makeSubject<number>(); 714 const fn = vi.fn(); 715 716 operators.takeUntil(notifier$)(input$)(fn); 717 next(1); 718 719 expect(fn).toHaveBeenCalledTimes(2); 720 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]); 721 722 notify(null); 723 expect(fn).toHaveBeenCalledTimes(3); 724 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End); 725 }); 726 727 it('emits values until a notifier emits', () => { 728 const { source: input$, next } = sources.makeSubject<number>(); 729 const fn = vi.fn(); 730 731 let hasClosed = false; 732 733 operators.takeUntil(sink => { 734 sink( 735 start(talkback => { 736 if (talkback === TalkbackKind.Close) { 737 hasClosed = true; 738 } else if (talkback === TalkbackKind.Pull && !hasClosed) { 739 sink(push(1)); 740 } 741 }) 742 ); 743 })(input$)(fn); 744 745 next(1); 746 747 expect(fn).toHaveBeenCalledTimes(2); 748 expect(fn.mock.calls).toEqual([[0], [start(expect.any(Function))]]); 749 750 expect(hasClosed).toBe(true); 751 }); 752}); 753 754describe('takeWhile', () => { 755 const noop = operators.takeWhile(() => true); 756 passesPassivePull(noop); 757 passesActivePush(noop); 758 passesSinkClose(noop); 759 passesSourceEnd(noop); 760 passesSingleStart(noop); 761 passesAsyncSequence(noop); 762 763 const ending = operators.takeWhile(() => false); 764 passesCloseAndEnd(ending); 765 766 it('emits values while a predicate passes for all values', () => { 767 const { source, next } = sources.makeSubject<number>(); 768 const fn = vi.fn(); 769 770 operators.takeWhile((x: any) => x < 2)(source)(fn); 771 next(1); 772 next(2); 773 774 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 775 }); 776}); 777 778describe('takeLast', () => { 779 passesCloseAndEnd(operators.takeLast(0)); 780 781 it('emits the last max values of an ended source', () => { 782 const { source, next, complete } = sources.makeSubject<number>(); 783 const signals: Signal<any>[] = []; 784 785 let talkback: TalkbackFn; 786 787 operators.takeLast(1)(source)(signal => { 788 signals.push(signal); 789 if (signal === SignalKind.End) { 790 /*noop*/ 791 } else if (signal.tag === SignalKind.Start) { 792 (talkback = signal[0])(TalkbackKind.Pull); 793 } else { 794 talkback!(TalkbackKind.Pull); 795 } 796 }); 797 798 next(1); 799 next(2); 800 801 expect(signals.length).toBe(0); 802 complete(); 803 804 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]); 805 }); 806}); 807 808describe('throttle', () => { 809 const noop = operators.throttle(() => 0); 810 passesPassivePull(noop); 811 passesActivePush(noop); 812 passesSinkClose(noop); 813 passesSourceEnd(noop); 814 passesSingleStart(noop); 815 passesAsyncSequence(noop); 816 817 it('should ignore emissions for a period of time after a value', () => { 818 const { source, next } = sources.makeSubject<number>(); 819 const fn = vi.fn(); 820 821 sinks.forEach(fn)(operators.throttle(() => 100)(source)); 822 823 next(1); 824 expect(fn).toHaveBeenCalledWith(1); 825 vi.advanceTimersByTime(50); 826 827 next(2); 828 expect(fn).toHaveBeenCalledTimes(1); 829 vi.advanceTimersByTime(50); 830 831 next(3); 832 expect(fn).toHaveBeenCalledWith(3); 833 }); 834});