Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.3.2 23 kB view raw
1import { describe, it, expect, beforeEach, vi } from 'vitest'; 2 3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 4import { push, start } from '../helpers'; 5 6import { 7 passesPassivePull, 8 passesActivePush, 9 passesSinkClose, 10 passesSourceEnd, 11 passesSingleStart, 12 passesStrictEnd, 13 passesSourcePushThenEnd, 14 passesAsyncSequence, 15 passesCloseAndEnd, 16} from './compliance'; 17 18import * as sources from '../sources'; 19import * as sinks from '../sinks'; 20import * as operators from '../operators'; 21 22beforeEach(() => { 23 vi.useFakeTimers(); 24}); 25 26describe('buffer', () => { 27 const valueThenNever: Source<any> = sink => 28 sink( 29 start(signal => { 30 if (signal === TalkbackKind.Pull) sink(push(null)); 31 }) 32 ); 33 34 const noop = operators.buffer(valueThenNever); 35 36 passesPassivePull(noop, [0]); 37 passesActivePush(noop, [0]); 38 passesSinkClose(noop); 39 passesSourcePushThenEnd(noop, [0]); 40 passesSingleStart(noop); 41 passesStrictEnd(noop); 42 43 it('emits batches of input values when a notifier emits', () => { 44 const { source: notifier$, next: notify } = sources.makeSubject(); 45 const { source: input$, next } = sources.makeSubject(); 46 const fn = vi.fn(); 47 48 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 49 50 next(1); 51 next(2); 52 expect(fn).not.toHaveBeenCalled(); 53 54 notify(null); 55 expect(fn).toHaveBeenCalledWith([1, 2]); 56 57 next(3); 58 notify(null); 59 expect(fn).toHaveBeenCalledWith([3]); 60 }); 61}); 62 63describe('concatMap', () => { 64 const noop = operators.concatMap(x => sources.fromValue(x)); 65 passesPassivePull(noop); 66 passesActivePush(noop); 67 passesSinkClose(noop); 68 passesSourcePushThenEnd(noop); 69 passesSingleStart(noop); 70 passesStrictEnd(noop); 71 passesAsyncSequence(noop); 72 73 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 74 it('emits values from each flattened synchronous source', () => { 75 const { source, next, complete } = sources.makeSubject<number>(); 76 const fn = vi.fn(); 77 78 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 79 80 next(1); 81 next(3); 82 complete(); 83 84 expect(fn).toHaveBeenCalledTimes(6); 85 expect(fn.mock.calls).toEqual([ 86 [start(expect.any(Function))], 87 [push(1)], 88 [push(2)], 89 [push(3)], 90 [push(4)], 91 [SignalKind.End], 92 ]); 93 }); 94 95 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 96 it('lets inner sources finish when outer source ends', () => { 97 const signals: Signal<any>[] = []; 98 const teardown = vi.fn(); 99 const fn = (signal: Signal<any>) => { 100 signals.push(signal); 101 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 102 signal[0](TalkbackKind.Pull); 103 signal[0](TalkbackKind.Close); 104 } 105 }; 106 107 operators.concatMap(() => { 108 return sources.make(() => teardown); 109 })(sources.fromValue(null))(fn); 110 111 expect(teardown).toHaveBeenCalled(); 112 expect(signals).toEqual([start(expect.any(Function))]); 113 }); 114 115 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 116 it('emits values from each flattened asynchronous source, one at a time', () => { 117 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 118 const fn = vi.fn(); 119 120 sinks.forEach(fn)( 121 operators.concatMap((x: number) => { 122 return operators.delay(5)(sources.fromArray([x, x * 2])); 123 })(source) 124 ); 125 126 vi.advanceTimersByTime(14); 127 expect(fn.mock.calls).toEqual([[1], [2]]); 128 129 vi.runAllTimers(); 130 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]); 131 }); 132 133 it('works for fully asynchronous sources', () => { 134 const fn = vi.fn(); 135 136 sinks.forEach(fn)( 137 operators.concatMap(() => { 138 return sources.make(observer => { 139 setTimeout(() => observer.next(1)); 140 return () => {}; 141 }); 142 })(sources.fromValue(null)) 143 ); 144 145 vi.runAllTimers(); 146 expect(fn).toHaveBeenCalledWith(1); 147 }); 148 149 it('emits synchronous values in order', () => { 150 const values: any[] = []; 151 152 sinks.forEach(x => values.push(x))( 153 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 154 ); 155 156 expect(values).toEqual([1, 2, 3, 4]); 157 }); 158}); 159 160describe('debounce', () => { 161 const noop = operators.debounce(() => 0); 162 passesPassivePull(noop); 163 passesActivePush(noop); 164 passesSinkClose(noop); 165 passesSourceEnd(noop); 166 passesSingleStart(noop); 167 passesStrictEnd(noop); 168 passesAsyncSequence(noop); 169 170 it('waits for a specified amount of silence before emitting the last value', () => { 171 const { source, next } = sources.makeSubject<number>(); 172 const fn = vi.fn(); 173 174 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 175 176 next(1); 177 vi.advanceTimersByTime(50); 178 expect(fn).not.toHaveBeenCalled(); 179 180 next(2); 181 vi.advanceTimersByTime(99); 182 expect(fn).not.toHaveBeenCalled(); 183 184 vi.advanceTimersByTime(1); 185 expect(fn).toHaveBeenCalledWith(2); 186 }); 187 188 it('emits debounced value with delayed End signal', () => { 189 const { source, next, complete } = sources.makeSubject<number>(); 190 const fn = vi.fn(); 191 192 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 193 194 next(1); 195 complete(); 196 vi.advanceTimersByTime(100); 197 expect(fn).toHaveBeenCalled(); 198 }); 199}); 200 201describe('delay', () => { 202 const noop = operators.delay(0); 203 passesPassivePull(noop); 204 passesActivePush(noop); 205 passesSinkClose(noop); 206 passesSourceEnd(noop); 207 passesSingleStart(noop); 208 passesAsyncSequence(noop); 209 210 it('delays outputs by a specified delay timeout value', () => { 211 const { source, next } = sources.makeSubject(); 212 const fn = vi.fn(); 213 214 sinks.forEach(fn)(operators.delay(100)(source)); 215 216 next(1); 217 expect(fn).not.toHaveBeenCalled(); 218 219 vi.advanceTimersByTime(100); 220 expect(fn).toHaveBeenCalledWith(1); 221 }); 222}); 223 224describe('filter', () => { 225 const noop = operators.filter(() => true); 226 passesPassivePull(noop); 227 passesActivePush(noop); 228 passesSinkClose(noop); 229 passesSourceEnd(noop); 230 passesSingleStart(noop); 231 passesAsyncSequence(noop); 232 233 it('prevents emissions for which a predicate fails', () => { 234 const { source, next } = sources.makeSubject<boolean>(); 235 const fn = vi.fn(); 236 237 sinks.forEach((x: true) => { 238 fn(x); 239 })(operators.filter((x): x is true => !!x)(source)); 240 241 next(false); 242 expect(fn).not.toHaveBeenCalled(); 243 244 next(true); 245 expect(fn).toHaveBeenCalledWith(true); 246 }); 247}); 248 249describe('map', () => { 250 const noop = operators.map(x => x); 251 passesPassivePull(noop); 252 passesActivePush(noop); 253 passesSinkClose(noop); 254 passesSourceEnd(noop); 255 passesSingleStart(noop); 256 passesAsyncSequence(noop); 257 258 it('maps over values given a transform function', () => { 259 const { source, next } = sources.makeSubject<number>(); 260 const fn = vi.fn(); 261 262 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 263 264 next(1); 265 expect(fn).toHaveBeenCalledWith(2); 266 }); 267}); 268 269describe('mergeMap', () => { 270 const noop = operators.mergeMap(x => sources.fromValue(x)); 271 passesPassivePull(noop); 272 passesActivePush(noop); 273 passesSinkClose(noop); 274 passesSourcePushThenEnd(noop); 275 passesSingleStart(noop); 276 passesStrictEnd(noop); 277 passesAsyncSequence(noop); 278 279 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 280 it('emits values from each flattened synchronous source', () => { 281 const { source, next, complete } = sources.makeSubject<number>(); 282 const fn = vi.fn(); 283 284 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 285 286 next(1); 287 next(3); 288 complete(); 289 290 expect(fn.mock.calls).toEqual([ 291 [start(expect.any(Function))], 292 [push(1)], 293 [push(2)], 294 [push(3)], 295 [push(4)], 296 [SignalKind.End], 297 ]); 298 }); 299 300 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 301 it('lets inner sources finish when outer source ends', () => { 302 const values: Signal<any>[] = []; 303 const teardown = vi.fn(); 304 const fn = (signal: Signal<any>) => { 305 values.push(signal); 306 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 307 signal[0](TalkbackKind.Pull); 308 signal[0](TalkbackKind.Close); 309 } 310 }; 311 312 operators.mergeMap(() => { 313 return sources.make(() => teardown); 314 })(sources.fromValue(null))(fn); 315 316 expect(teardown).toHaveBeenCalled(); 317 expect(values).toEqual([start(expect.any(Function))]); 318 }); 319 320 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 321 it('emits values from each flattened asynchronous source simultaneously', () => { 322 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 323 const fn = vi.fn(); 324 325 sinks.forEach(fn)( 326 operators.mergeMap((x: number) => { 327 return operators.delay(5)(sources.fromArray([x, x * 2])); 328 })(source) 329 ); 330 331 vi.runAllTimers(); 332 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]); 333 }); 334 335 it('emits synchronous values in order', () => { 336 const values: any[] = []; 337 338 sinks.forEach(x => values.push(x))( 339 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 340 ); 341 342 expect(values).toEqual([1, 2, 3, 4]); 343 }); 344}); 345 346describe('onEnd', () => { 347 const noop = operators.onEnd(() => {}); 348 passesPassivePull(noop); 349 passesActivePush(noop); 350 passesSinkClose(noop); 351 passesSourceEnd(noop); 352 passesStrictEnd(noop); 353 passesSingleStart(noop); 354 passesAsyncSequence(noop); 355 356 it('calls a callback when the source ends', () => { 357 const { source, next, complete } = sources.makeSubject<any>(); 358 const fn = vi.fn(); 359 360 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 361 362 next(null); 363 expect(fn).not.toHaveBeenCalled(); 364 365 complete(); 366 expect(fn).toHaveBeenCalled(); 367 }); 368}); 369 370describe('onPush', () => { 371 const noop = operators.onPush(() => {}); 372 passesPassivePull(noop); 373 passesActivePush(noop); 374 passesSinkClose(noop); 375 passesSourceEnd(noop); 376 passesStrictEnd(noop); 377 passesSingleStart(noop); 378 passesAsyncSequence(noop); 379 380 it('calls a callback when the source emits', () => { 381 const { source, next } = sources.makeSubject<number>(); 382 const fn = vi.fn(); 383 384 sinks.forEach(() => {})(operators.onPush(fn)(source)); 385 386 next(1); 387 expect(fn).toHaveBeenCalledWith(1); 388 next(2); 389 expect(fn).toHaveBeenCalledWith(2); 390 }); 391 392 it('is the same as `tap`', () => { 393 expect(operators.onPush).toBe(operators.tap); 394 }); 395}); 396 397describe('onStart', () => { 398 const noop = operators.onStart(() => {}); 399 passesPassivePull(noop); 400 passesActivePush(noop); 401 passesSinkClose(noop); 402 passesSourceEnd(noop); 403 passesSingleStart(noop); 404 passesAsyncSequence(noop); 405 406 it('is called when the source starts', () => { 407 let sink: Sink<any>; 408 409 const fn = vi.fn(); 410 const source: Source<any> = _sink => { 411 sink = _sink; 412 }; 413 414 sinks.forEach(() => {})(operators.onStart(fn)(source)); 415 416 expect(fn).not.toHaveBeenCalled(); 417 418 sink!(start(() => {})); 419 expect(fn).toHaveBeenCalled(); 420 }); 421}); 422 423describe('sample', () => { 424 const valueThenNever: Source<any> = sink => 425 sink( 426 start(signal => { 427 if (signal === TalkbackKind.Pull) sink(push(null)); 428 }) 429 ); 430 431 const noop = operators.sample(valueThenNever); 432 433 passesPassivePull(noop); 434 passesActivePush(noop); 435 passesSinkClose(noop); 436 passesSourcePushThenEnd(noop); 437 passesSingleStart(noop); 438 passesStrictEnd(noop); 439 440 it('emits the latest value when a notifier source emits', () => { 441 const { source: notifier$, next: notify } = sources.makeSubject(); 442 const { source: input$, next } = sources.makeSubject(); 443 const fn = vi.fn(); 444 445 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 446 447 next(1); 448 next(2); 449 expect(fn).not.toHaveBeenCalled(); 450 451 notify(null); 452 expect(fn).toHaveBeenCalledWith(2); 453 }); 454}); 455 456describe('scan', () => { 457 const noop = operators.scan<any, any>((_acc, x) => x, null); 458 passesPassivePull(noop); 459 passesActivePush(noop); 460 passesSinkClose(noop); 461 passesSourceEnd(noop); 462 passesSingleStart(noop); 463 passesAsyncSequence(noop); 464 465 it('folds values continuously with a reducer and initial value', () => { 466 const { source: input$, next } = sources.makeSubject<number>(); 467 const fn = vi.fn(); 468 469 const reducer = (acc: number, x: number) => acc + x; 470 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 471 472 next(1); 473 expect(fn).toHaveBeenCalledWith(1); 474 next(2); 475 expect(fn).toHaveBeenCalledWith(3); 476 }); 477}); 478 479describe('share', () => { 480 const noop = operators.share; 481 passesPassivePull(noop); 482 passesActivePush(noop); 483 passesSinkClose(noop); 484 passesSourceEnd(noop); 485 passesSingleStart(noop); 486 passesStrictEnd(noop); 487 passesAsyncSequence(noop); 488 489 it('shares output values between sinks', () => { 490 let onPush = () => {}; 491 492 const source: Source<any> = operators.share(sink => { 493 sink(start(() => {})); 494 onPush = () => { 495 sink(push([0])); 496 sink(SignalKind.End); 497 }; 498 }); 499 500 const fnA = vi.fn(); 501 const fnB = vi.fn(); 502 503 sinks.forEach(fnA)(source); 504 sinks.forEach(fnB)(source); 505 onPush(); 506 507 expect(fnA).toHaveBeenCalledWith([0]); 508 expect(fnB).toHaveBeenCalledWith([0]); 509 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 510 }); 511 512 it('completes the source when no more sink is listening', () => { 513 let onPush = () => {}; 514 515 const talkback = vi.fn(); 516 const source: Source<any> = operators.share(sink => { 517 sink(start(talkback)); 518 onPush = () => { 519 sink(push([0])); 520 sink(push([1])); 521 sink(SignalKind.End); 522 }; 523 }); 524 525 const fnA = vi.fn(); 526 const fnB = vi.fn(); 527 528 sinks.forEach(fnA)(operators.take(1)(source)); 529 sinks.forEach(fnB)(operators.take(1)(source)); 530 onPush(); 531 532 expect(fnA).toHaveBeenCalledWith([0]); 533 expect(fnB).toHaveBeenCalledWith([0]); 534 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 535 expect(talkback).toHaveBeenCalledWith(TalkbackKind.Close); 536 }); 537}); 538 539describe('skip', () => { 540 const noop = operators.skip(0); 541 passesPassivePull(noop); 542 passesActivePush(noop); 543 passesSinkClose(noop); 544 passesSourceEnd(noop); 545 passesSingleStart(noop); 546 passesAsyncSequence(noop); 547 548 it('skips a number of values before emitting normally', () => { 549 const { source, next } = sources.makeSubject<number>(); 550 const fn = vi.fn(); 551 552 sinks.forEach(fn)(operators.skip(1)(source)); 553 554 next(1); 555 expect(fn).not.toHaveBeenCalled(); 556 next(2); 557 expect(fn).toHaveBeenCalledWith(2); 558 }); 559}); 560 561describe('skipUntil', () => { 562 const noop = operators.skipUntil(sources.fromValue(null)); 563 passesPassivePull(noop); 564 passesActivePush(noop); 565 passesSinkClose(noop); 566 passesSourceEnd(noop); 567 passesSingleStart(noop); 568 passesAsyncSequence(noop); 569 passesStrictEnd(noop); 570 571 it('skips values until the notifier source emits', () => { 572 const { source: notifier$, next: notify } = sources.makeSubject(); 573 const { source: input$, next } = sources.makeSubject<number>(); 574 const fn = vi.fn(); 575 576 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 577 578 next(1); 579 expect(fn).not.toHaveBeenCalled(); 580 notify(null); 581 next(2); 582 expect(fn).toHaveBeenCalledWith(2); 583 }); 584}); 585 586describe('skipWhile', () => { 587 const noop = operators.skipWhile(() => false); 588 passesPassivePull(noop); 589 passesActivePush(noop); 590 passesSinkClose(noop); 591 passesSourceEnd(noop); 592 passesSingleStart(noop); 593 passesAsyncSequence(noop); 594 595 it('skips values until one fails a predicate', () => { 596 const { source, next } = sources.makeSubject<number>(); 597 const fn = vi.fn(); 598 599 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source)); 600 601 next(1); 602 expect(fn).not.toHaveBeenCalled(); 603 next(2); 604 expect(fn).toHaveBeenCalledWith(2); 605 }); 606}); 607 608describe('switchMap', () => { 609 const noop = operators.switchMap(x => sources.fromValue(x)); 610 passesPassivePull(noop); 611 passesActivePush(noop); 612 passesSinkClose(noop); 613 passesSourcePushThenEnd(noop); 614 passesSingleStart(noop); 615 passesStrictEnd(noop); 616 passesAsyncSequence(noop); 617 618 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 619 it('emits values from each flattened synchronous source', () => { 620 const { source, next, complete } = sources.makeSubject<number>(); 621 const fn = vi.fn(); 622 623 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 624 625 next(1); 626 next(3); 627 complete(); 628 629 expect(fn).toHaveBeenCalledTimes(6); 630 expect(fn.mock.calls).toEqual([ 631 [start(expect.any(Function))], 632 [push(1)], 633 [push(2)], 634 [push(3)], 635 [push(4)], 636 [SignalKind.End], 637 ]); 638 }); 639 640 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 641 it('lets inner sources finish when outer source ends', () => { 642 const signals: Signal<any>[] = []; 643 const teardown = vi.fn(); 644 const fn = (signal: Signal<any>) => { 645 signals.push(signal); 646 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 647 signal[0](TalkbackKind.Pull); 648 signal[0](TalkbackKind.Close); 649 } 650 }; 651 652 operators.switchMap(() => { 653 return sources.make(() => teardown); 654 })(sources.fromValue(null))(fn); 655 656 expect(teardown).toHaveBeenCalled(); 657 expect(signals).toEqual([start(expect.any(Function))]); 658 }); 659 660 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 661 it('emits values from each flattened asynchronous source, one at a time', () => { 662 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 663 const fn = vi.fn(); 664 665 sinks.forEach(fn)( 666 operators.switchMap((x: number) => 667 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5))) 668 )(source) 669 ); 670 671 vi.runAllTimers(); 672 expect(fn.mock.calls).toEqual([[1], [10], [20]]); 673 }); 674}); 675 676describe('take', () => { 677 const noop = operators.take(10); 678 passesPassivePull(noop); 679 passesActivePush(noop); 680 passesSinkClose(noop); 681 passesSourceEnd(noop); 682 passesSingleStart(noop); 683 passesStrictEnd(noop); 684 passesAsyncSequence(noop); 685 686 passesCloseAndEnd(operators.take(0)); 687 688 it('emits values until a maximum is reached', () => { 689 const { source, next } = sources.makeSubject<number>(); 690 const fn = vi.fn(); 691 692 operators.take(1)(source)(fn); 693 next(1); 694 695 expect(fn).toHaveBeenCalledTimes(3); 696 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 697 }); 698}); 699 700describe('takeUntil', () => { 701 const noop = operators.takeUntil(sources.never); 702 passesPassivePull(noop); 703 passesActivePush(noop); 704 passesSinkClose(noop); 705 passesSourcePushThenEnd(noop); 706 passesSingleStart(noop); 707 passesStrictEnd(noop); 708 passesAsyncSequence(noop); 709 710 const ending = operators.takeUntil(sources.fromValue(null)); 711 passesCloseAndEnd(ending); 712 713 it('emits values until a notifier emits', () => { 714 const { source: notifier$, next: notify } = sources.makeSubject<any>(); 715 const { source: input$, next } = sources.makeSubject<number>(); 716 const fn = vi.fn(); 717 718 operators.takeUntil(notifier$)(input$)(fn); 719 next(1); 720 721 expect(fn).toHaveBeenCalledTimes(2); 722 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]); 723 724 notify(null); 725 expect(fn).toHaveBeenCalledTimes(3); 726 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End); 727 }); 728 729 it('emits values until a notifier emits', () => { 730 const { source: input$, next } = sources.makeSubject<number>(); 731 const fn = vi.fn(); 732 733 let hasClosed = false; 734 735 operators.takeUntil(sink => { 736 sink( 737 start(talkback => { 738 if (talkback === TalkbackKind.Close) { 739 hasClosed = true; 740 } else if (talkback === TalkbackKind.Pull && !hasClosed) { 741 sink(push(1)); 742 } 743 }) 744 ); 745 })(input$)(fn); 746 747 next(1); 748 749 expect(fn).toHaveBeenCalledTimes(2); 750 expect(fn.mock.calls).toEqual([[0], [start(expect.any(Function))]]); 751 752 expect(hasClosed).toBe(true); 753 }); 754}); 755 756describe('takeWhile', () => { 757 const noop = operators.takeWhile(() => true); 758 passesPassivePull(noop); 759 passesActivePush(noop); 760 passesSinkClose(noop); 761 passesSourceEnd(noop); 762 passesSingleStart(noop); 763 passesAsyncSequence(noop); 764 765 const ending = operators.takeWhile(() => false); 766 passesCloseAndEnd(ending); 767 768 it('emits values while a predicate passes for all values', () => { 769 const { source, next } = sources.makeSubject<number>(); 770 const fn = vi.fn(); 771 772 operators.takeWhile((x: any) => x < 2)(source)(fn); 773 next(1); 774 next(2); 775 next(3); 776 777 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 778 }); 779 780 it('emits values while a predicate passes for all values plus an additional one', () => { 781 const { source, next } = sources.makeSubject<number>(); 782 const fn = vi.fn(); 783 784 operators.takeWhile((x: any) => x < 2, true)(source)(fn); 785 next(1); 786 next(2); 787 next(3); 788 789 expect(fn.mock.calls).toEqual([ 790 [start(expect.any(Function))], 791 [push(1)], 792 [push(2)], 793 [SignalKind.End], 794 ]); 795 }); 796}); 797 798describe('takeLast', () => { 799 passesCloseAndEnd(operators.takeLast(0)); 800 801 it('emits the last max values of an ended source', () => { 802 const { source, next, complete } = sources.makeSubject<number>(); 803 const signals: Signal<any>[] = []; 804 805 let talkback: TalkbackFn; 806 807 operators.takeLast(1)(source)(signal => { 808 signals.push(signal); 809 if (signal === SignalKind.End) { 810 /*noop*/ 811 } else if (signal.tag === SignalKind.Start) { 812 (talkback = signal[0])(TalkbackKind.Pull); 813 } else { 814 talkback!(TalkbackKind.Pull); 815 } 816 }); 817 818 next(1); 819 next(2); 820 821 expect(signals.length).toBe(0); 822 complete(); 823 824 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]); 825 }); 826}); 827 828describe('throttle', () => { 829 const noop = operators.throttle(() => 0); 830 passesPassivePull(noop); 831 passesActivePush(noop); 832 passesSinkClose(noop); 833 passesSourceEnd(noop); 834 passesSingleStart(noop); 835 passesAsyncSequence(noop); 836 837 it('should ignore emissions for a period of time after a value', () => { 838 const { source, next } = sources.makeSubject<number>(); 839 const fn = vi.fn(); 840 841 sinks.forEach(fn)(operators.throttle(() => 100)(source)); 842 843 next(1); 844 expect(fn).toHaveBeenCalledWith(1); 845 vi.advanceTimersByTime(50); 846 847 next(2); 848 expect(fn).toHaveBeenCalledTimes(1); 849 vi.advanceTimersByTime(50); 850 851 next(3); 852 expect(fn).toHaveBeenCalledWith(3); 853 }); 854});