Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.0.0 33 kB view raw
1import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 2import { push, start } from '../helpers'; 3 4import * as sources from '../sources'; 5import * as sinks from '../sinks'; 6import * as operators from '../operators'; 7 8/* This tests a noop operator for passive Pull talkback signals. 9 A Pull will be sent from the sink upwards and should pass through 10 the operator until the source receives it, which then pushes a 11 value down. */ 12const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => { 13 it('responds to Pull talkback signals (spec)', () => { 14 let talkback: TalkbackFn | null = null; 15 let pushes = 0; 16 const values: any[] = []; 17 18 const source: Source<any> = sink => { 19 sink( 20 start(signal => { 21 if (!pushes && signal === TalkbackKind.Pull) { 22 pushes++; 23 sink(push(0)); 24 } 25 }) 26 ); 27 }; 28 29 const sink: Sink<any> = signal => { 30 expect(signal).not.toBe(SignalKind.End); 31 if (signal === SignalKind.End) { 32 /*noop*/ 33 } else if (signal.tag === SignalKind.Push) { 34 values.push(signal[0]); 35 } else { 36 talkback = signal[0]; 37 } 38 }; 39 40 operator(source)(sink); 41 // The Start signal should always come in immediately 42 expect(talkback).not.toBe(null); 43 // No Push signals should be issued initially 44 expect(values).toEqual([]); 45 46 // When pulling a value we expect an immediate response 47 talkback!(TalkbackKind.Pull); 48 jest.runAllTimers(); 49 expect(values).toEqual([output]); 50 }); 51}; 52 53/* This tests a noop operator for regular, active Push signals. 54 A Push will be sent downwards from the source, through the 55 operator to the sink. Pull events should be let through from 56 the sink after every Push event. */ 57const passesActivePush = (operator: Operator<any, any>, result: any = 0) => { 58 it('responds to eager Push signals (spec)', () => { 59 const values: any[] = []; 60 let talkback: TalkbackFn | null = null; 61 let sink: Sink<any> | null = null; 62 let pulls = 0; 63 64 const source: Source<any> = _sink => { 65 (sink = _sink)( 66 start(signal => { 67 if (signal === TalkbackKind.Pull) pulls++; 68 }) 69 ); 70 }; 71 72 operator(source)(signal => { 73 expect(signal).not.toBe(SignalKind.End); 74 if (signal === SignalKind.End) { 75 /*noop*/ 76 } else if (signal.tag === SignalKind.Start) { 77 talkback = signal[0]; 78 } else if (signal.tag === SignalKind.Push) { 79 values.push(signal[0]); 80 talkback!(TalkbackKind.Pull); 81 } 82 }); 83 84 // No Pull signals should be issued initially 85 expect(pulls).toBe(0); 86 87 // When pushing a value we expect an immediate response 88 sink!(push(0)); 89 jest.runAllTimers(); 90 expect(values).toEqual([result]); 91 // Subsequently the Pull signal should have travelled upwards 92 expect(pulls).toBe(1); 93 }); 94}; 95 96/* This tests a noop operator for Close talkback signals from the sink. 97 A Close signal will be sent, which should be forwarded to the source, 98 which then ends the communication without sending an End signal. */ 99const passesSinkClose = (operator: Operator<any, any>) => { 100 it('responds to Close signals from sink (spec)', () => { 101 let talkback: TalkbackFn | null = null; 102 let closing = 0; 103 104 const source: Source<any> = sink => { 105 sink( 106 start(signal => { 107 if (signal === TalkbackKind.Pull && !closing) { 108 sink(push(0)); 109 } else if (signal === TalkbackKind.Close) { 110 closing++; 111 } 112 }) 113 ); 114 }; 115 116 const sink: Sink<any> = signal => { 117 expect(signal).not.toBe(SignalKind.End); 118 if (signal === SignalKind.End) { 119 /*noop*/ 120 } else if (signal.tag === SignalKind.Push) { 121 talkback!(TalkbackKind.Close); 122 } else { 123 talkback = signal[0]; 124 } 125 }; 126 127 operator(source)(sink); 128 129 // When pushing a value we expect an immediate close signal 130 talkback!(TalkbackKind.Pull); 131 jest.runAllTimers(); 132 expect(closing).toBe(1); 133 }); 134}; 135 136/* This tests a noop operator for End signals from the source. 137 A Push and End signal will be sent after the first Pull talkback 138 signal from the sink, which shouldn't lead to any extra Close or Pull 139 talkback signals. */ 140const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => { 141 it('passes on immediate Push then End signals from source (spec)', () => { 142 const signals: Signal<any>[] = []; 143 let talkback: TalkbackFn | null = null; 144 let pulls = 0; 145 let ending = 0; 146 147 const source: Source<any> = sink => { 148 sink( 149 start(signal => { 150 expect(signal).not.toBe(TalkbackKind.Close); 151 if (signal === TalkbackKind.Pull) { 152 pulls++; 153 if (pulls === 1) { 154 sink(push(0)); 155 sink(SignalKind.End); 156 } 157 } 158 }) 159 ); 160 }; 161 162 const sink: Sink<any> = signal => { 163 if (signal === SignalKind.End) { 164 signals.push(signal); 165 ending++; 166 } else if (signal.tag === SignalKind.Push) { 167 signals.push(signal); 168 } else { 169 talkback = signal[0]; 170 } 171 }; 172 173 operator(source)(sink); 174 175 // When pushing a value we expect an immediate Push then End signal 176 talkback!(TalkbackKind.Pull); 177 jest.runAllTimers(); 178 expect(ending).toBe(1); 179 expect(signals).toEqual([push(result), SignalKind.End]); 180 // Also no additional pull event should be created by the operator 181 expect(pulls).toBe(1); 182 }); 183}; 184 185/* This tests a noop operator for End signals from the source 186 after the first pull in response to another. 187 This is similar to passesSourceEnd but more well behaved since 188 mergeMap/switchMap/concatMap are eager operators. */ 189const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => { 190 it('passes on End signals from source (spec)', () => { 191 const signals: Signal<any>[] = []; 192 let talkback: TalkbackFn | null = null; 193 let pulls = 0; 194 let ending = 0; 195 196 const source: Source<any> = sink => { 197 sink( 198 start(signal => { 199 expect(signal).not.toBe(TalkbackKind.Close); 200 if (signal === TalkbackKind.Pull) { 201 pulls++; 202 if (pulls <= 2) { 203 sink(push(0)); 204 } else { 205 sink(SignalKind.End); 206 } 207 } 208 }) 209 ); 210 }; 211 212 const sink: Sink<any> = signal => { 213 if (signal === SignalKind.End) { 214 signals.push(signal); 215 ending++; 216 } else if (signal.tag === SignalKind.Push) { 217 signals.push(signal); 218 talkback!(TalkbackKind.Pull); 219 } else { 220 talkback = signal[0]; 221 } 222 }; 223 224 operator(source)(sink); 225 226 // When pushing a value we expect an immediate Push then End signal 227 talkback!(TalkbackKind.Pull); 228 jest.runAllTimers(); 229 expect(ending).toBe(1); 230 expect(pulls).toBe(3); 231 expect(signals).toEqual([push(result), push(result), SignalKind.End]); 232 }); 233}; 234 235/* This tests a noop operator for Start signals from the source. 236 When the operator's sink is started by the source it'll receive 237 a Start event. As a response it should never send more than one 238 Start signals to the sink. */ 239const passesSingleStart = (operator: Operator<any, any>) => { 240 it('sends a single Start event to the incoming sink (spec)', () => { 241 let starts = 0; 242 243 const source: Source<any> = sink => { 244 sink(start(() => {})); 245 }; 246 247 const sink: Sink<any> = signal => { 248 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 249 starts++; 250 } 251 }; 252 253 // When starting the operator we expect a single start event on the sink 254 operator(source)(sink); 255 expect(starts).toBe(1); 256 }); 257}; 258 259/* This tests a noop operator for silence after End signals from the source. 260 When the operator receives the End signal it shouldn't forward any other 261 signals to the sink anymore. 262 This isn't a strict requirement, but some operators should ensure that 263 all sources are well behaved. This is particularly true for operators 264 that either Close sources themselves or may operate on multiple sources. */ 265const passesStrictEnd = (operator: Operator<any, any>) => { 266 it('stops all signals after End has been received (spec: strict end)', () => { 267 let pulls = 0; 268 const signals: Signal<any>[] = []; 269 270 const source: Source<any> = sink => { 271 sink( 272 start(signal => { 273 if (signal === TalkbackKind.Pull) { 274 pulls++; 275 sink(SignalKind.End); 276 sink(push(123)); 277 } 278 }) 279 ); 280 }; 281 282 const sink: Sink<any> = signal => { 283 if (signal === SignalKind.End) { 284 signals.push(signal); 285 } else if (signal.tag === SignalKind.Push) { 286 signals.push(signal); 287 } else { 288 signal[0](TalkbackKind.Pull); 289 } 290 }; 291 292 operator(source)(sink); 293 294 // The Push signal should've been dropped 295 jest.runAllTimers(); 296 expect(signals).toEqual([SignalKind.End]); 297 expect(pulls).toBe(1); 298 }); 299 300 it('stops all signals after Close has been received (spec: strict close)', () => { 301 const signals: Signal<any>[] = []; 302 303 const source: Source<any> = sink => { 304 sink( 305 start(signal => { 306 if (signal === TalkbackKind.Close) { 307 sink(push(123)); 308 } 309 }) 310 ); 311 }; 312 313 const sink: Sink<any> = signal => { 314 if (signal === SignalKind.End) { 315 signals.push(signal); 316 } else if (signal.tag === SignalKind.Push) { 317 signals.push(signal); 318 } else { 319 signal[0](TalkbackKind.Close); 320 } 321 }; 322 323 operator(source)(sink); 324 325 // The Push signal should've been dropped 326 jest.runAllTimers(); 327 expect(signals).toEqual([]); 328 }); 329}; 330 331/* This tests an immediately closing operator for End signals to 332 the sink and Close signals to the source. 333 When an operator closes immediately we expect to see a Close 334 signal at the source and an End signal to the sink, since the 335 closing operator is expected to end the entire chain. */ 336const passesCloseAndEnd = (closingOperator: Operator<any, any>) => { 337 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 338 let closing = 0; 339 let ending = 0; 340 341 const source: Source<any> = sink => { 342 sink( 343 start(signal => { 344 // For some operator tests we do need to send a single value 345 if (signal === TalkbackKind.Pull) { 346 sink(push(null)); 347 } else { 348 closing++; 349 } 350 }) 351 ); 352 }; 353 354 const sink: Sink<any> = signal => { 355 if (signal === SignalKind.End) { 356 ending++; 357 } else if (signal.tag === SignalKind.Start) { 358 signal[0](TalkbackKind.Pull); 359 } 360 }; 361 362 // We expect the operator to immediately end and close 363 closingOperator(source)(sink); 364 expect(closing).toBe(1); 365 expect(ending).toBe(1); 366 }); 367}; 368 369const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => { 370 it('passes an async push with an async end (spec)', () => { 371 let hasPushed = false; 372 const signals: Signal<any>[] = []; 373 374 const source: Source<any> = sink => { 375 sink( 376 start(signal => { 377 if (signal === TalkbackKind.Pull && !hasPushed) { 378 hasPushed = true; 379 setTimeout(() => sink(push(0)), 10); 380 setTimeout(() => sink(SignalKind.End), 20); 381 } 382 }) 383 ); 384 }; 385 386 const sink: Sink<any> = signal => { 387 if (signal === SignalKind.End) { 388 signals.push(signal); 389 } else if (signal.tag === SignalKind.Push) { 390 signals.push(signal); 391 } else { 392 setTimeout(() => { 393 signal[0](TalkbackKind.Pull); 394 }, 5); 395 } 396 }; 397 398 // We initially expect to see the push signal 399 // Afterwards after all timers all other signals come in 400 operator(source)(sink); 401 expect(signals.length).toBe(0); 402 jest.advanceTimersByTime(5); 403 expect(hasPushed).toBeTruthy(); 404 jest.runAllTimers(); 405 406 expect(signals).toEqual([push(result), SignalKind.End]); 407 }); 408}; 409 410beforeEach(() => { 411 jest.useFakeTimers(); 412}); 413 414describe('combine', () => { 415 const noop = (source: Source<any>) => operators.combine(sources.fromValue(0), source); 416 417 passesPassivePull(noop, [0, 0]); 418 passesActivePush(noop, [0, 0]); 419 passesSinkClose(noop); 420 passesSourceEnd(noop, [0, 0]); 421 passesSingleStart(noop); 422 passesStrictEnd(noop); 423 424 it('emits the zipped values of two sources', () => { 425 const { source: sourceA, next: nextA } = sources.makeSubject(); 426 const { source: sourceB, next: nextB } = sources.makeSubject(); 427 const fn = jest.fn(); 428 429 sinks.forEach(fn)(operators.combine(sourceA, sourceB)); 430 431 nextA(1); 432 expect(fn).not.toHaveBeenCalled(); 433 nextB(2); 434 expect(fn).toHaveBeenCalledWith([1, 2]); 435 }); 436}); 437 438describe('buffer', () => { 439 const valueThenNever: Source<any> = sink => 440 sink( 441 start(signal => { 442 if (signal === TalkbackKind.Pull) sink(push(null)); 443 }) 444 ); 445 446 const noop = operators.buffer(valueThenNever); 447 448 passesPassivePull(noop, [0]); 449 passesActivePush(noop, [0]); 450 passesSinkClose(noop); 451 passesSourcePushThenEnd(noop, [0]); 452 passesSingleStart(noop); 453 passesStrictEnd(noop); 454 455 it('emits batches of input values when a notifier emits', () => { 456 const { source: notifier$, next: notify } = sources.makeSubject(); 457 const { source: input$, next } = sources.makeSubject(); 458 const fn = jest.fn(); 459 460 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 461 462 next(1); 463 next(2); 464 expect(fn).not.toHaveBeenCalled(); 465 466 notify(null); 467 expect(fn).toHaveBeenCalledWith([1, 2]); 468 469 next(3); 470 notify(null); 471 expect(fn).toHaveBeenCalledWith([3]); 472 }); 473}); 474 475describe('concatMap', () => { 476 const noop = operators.concatMap(x => sources.fromValue(x)); 477 passesPassivePull(noop); 478 passesActivePush(noop); 479 passesSinkClose(noop); 480 passesSourcePushThenEnd(noop); 481 passesSingleStart(noop); 482 passesStrictEnd(noop); 483 passesAsyncSequence(noop); 484 485 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 486 it('emits values from each flattened synchronous source', () => { 487 const { source, next, complete } = sources.makeSubject<number>(); 488 const fn = jest.fn(); 489 490 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 491 492 next(1); 493 next(3); 494 complete(); 495 496 expect(fn).toHaveBeenCalledTimes(6); 497 expect(fn.mock.calls).toEqual([ 498 [start(expect.any(Function))], 499 [push(1)], 500 [push(2)], 501 [push(3)], 502 [push(4)], 503 [SignalKind.End], 504 ]); 505 }); 506 507 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 508 it('lets inner sources finish when outer source ends', () => { 509 const signals: Signal<any>[] = []; 510 const teardown = jest.fn(); 511 const fn = (signal: Signal<any>) => { 512 signals.push(signal); 513 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 514 signal[0](TalkbackKind.Pull); 515 signal[0](TalkbackKind.Close); 516 } 517 }; 518 519 operators.concatMap(() => { 520 return sources.make(() => teardown); 521 })(sources.fromValue(null))(fn); 522 523 expect(teardown).toHaveBeenCalled(); 524 expect(signals).toEqual([start(expect.any(Function))]); 525 }); 526 527 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 528 it('emits values from each flattened asynchronous source, one at a time', () => { 529 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 530 const fn = jest.fn(); 531 532 sinks.forEach(fn)( 533 operators.concatMap((x: number) => { 534 return operators.delay(5)(sources.fromArray([x, x * 2])); 535 })(source) 536 ); 537 538 jest.advanceTimersByTime(14); 539 expect(fn.mock.calls).toEqual([[1], [2]]); 540 541 jest.runAllTimers(); 542 expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]); 543 }); 544 545 it('works for fully asynchronous sources', () => { 546 const fn = jest.fn(); 547 548 sinks.forEach(fn)( 549 operators.concatMap(() => { 550 return sources.make(observer => { 551 setTimeout(() => observer.next(1)); 552 return () => {}; 553 }); 554 })(sources.fromValue(null)) 555 ); 556 557 jest.runAllTimers(); 558 expect(fn).toHaveBeenCalledWith(1); 559 }); 560 561 it('emits synchronous values in order', () => { 562 const values: any[] = []; 563 564 sinks.forEach(x => values.push(x))( 565 operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 566 ); 567 568 expect(values).toEqual([1, 2, 3, 4]); 569 }); 570}); 571 572describe('debounce', () => { 573 const noop = operators.debounce(() => 0); 574 passesPassivePull(noop); 575 passesActivePush(noop); 576 passesSinkClose(noop); 577 passesSourceEnd(noop); 578 passesSingleStart(noop); 579 passesStrictEnd(noop); 580 passesAsyncSequence(noop); 581 582 it('waits for a specified amount of silence before emitting the last value', () => { 583 const { source, next } = sources.makeSubject<number>(); 584 const fn = jest.fn(); 585 586 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 587 588 next(1); 589 jest.advanceTimersByTime(50); 590 expect(fn).not.toHaveBeenCalled(); 591 592 next(2); 593 jest.advanceTimersByTime(99); 594 expect(fn).not.toHaveBeenCalled(); 595 596 jest.advanceTimersByTime(1); 597 expect(fn).toHaveBeenCalledWith(2); 598 }); 599 600 it('emits debounced value with delayed End signal', () => { 601 const { source, next, complete } = sources.makeSubject<number>(); 602 const fn = jest.fn(); 603 604 sinks.forEach(fn)(operators.debounce(() => 100)(source)); 605 606 next(1); 607 complete(); 608 jest.advanceTimersByTime(100); 609 expect(fn).toHaveBeenCalled(); 610 }); 611}); 612 613describe('delay', () => { 614 const noop = operators.delay(0); 615 passesPassivePull(noop); 616 passesActivePush(noop); 617 passesSinkClose(noop); 618 passesSourceEnd(noop); 619 passesSingleStart(noop); 620 passesAsyncSequence(noop); 621 622 it('delays outputs by a specified delay timeout value', () => { 623 const { source, next } = sources.makeSubject(); 624 const fn = jest.fn(); 625 626 sinks.forEach(fn)(operators.delay(100)(source)); 627 628 next(1); 629 expect(fn).not.toHaveBeenCalled(); 630 631 jest.advanceTimersByTime(100); 632 expect(fn).toHaveBeenCalledWith(1); 633 }); 634}); 635 636describe('filter', () => { 637 const noop = operators.filter(() => true); 638 passesPassivePull(noop); 639 passesActivePush(noop); 640 passesSinkClose(noop); 641 passesSourceEnd(noop); 642 passesSingleStart(noop); 643 passesAsyncSequence(noop); 644 645 it('prevents emissions for which a predicate fails', () => { 646 const { source, next } = sources.makeSubject(); 647 const fn = jest.fn(); 648 649 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 650 651 next(false); 652 expect(fn).not.toHaveBeenCalled(); 653 654 next(true); 655 expect(fn).toHaveBeenCalledWith(true); 656 }); 657}); 658 659describe('map', () => { 660 const noop = operators.map(x => x); 661 passesPassivePull(noop); 662 passesActivePush(noop); 663 passesSinkClose(noop); 664 passesSourceEnd(noop); 665 passesSingleStart(noop); 666 passesAsyncSequence(noop); 667 668 it('maps over values given a transform function', () => { 669 const { source, next } = sources.makeSubject<number>(); 670 const fn = jest.fn(); 671 672 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 673 674 next(1); 675 expect(fn).toHaveBeenCalledWith(2); 676 }); 677}); 678 679describe('mergeMap', () => { 680 const noop = operators.mergeMap(x => sources.fromValue(x)); 681 passesPassivePull(noop); 682 passesActivePush(noop); 683 passesSinkClose(noop); 684 passesSourcePushThenEnd(noop); 685 passesSingleStart(noop); 686 passesStrictEnd(noop); 687 passesAsyncSequence(noop); 688 689 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 690 it('emits values from each flattened synchronous source', () => { 691 const { source, next, complete } = sources.makeSubject<number>(); 692 const fn = jest.fn(); 693 694 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 695 696 next(1); 697 next(3); 698 complete(); 699 700 expect(fn.mock.calls).toEqual([ 701 [start(expect.any(Function))], 702 [push(1)], 703 [push(2)], 704 [push(3)], 705 [push(4)], 706 [SignalKind.End], 707 ]); 708 }); 709 710 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 711 it('lets inner sources finish when outer source ends', () => { 712 const values: Signal<any>[] = []; 713 const teardown = jest.fn(); 714 const fn = (signal: Signal<any>) => { 715 values.push(signal); 716 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 717 signal[0](TalkbackKind.Pull); 718 signal[0](TalkbackKind.Close); 719 } 720 }; 721 722 operators.mergeMap(() => { 723 return sources.make(() => teardown); 724 })(sources.fromValue(null))(fn); 725 726 expect(teardown).toHaveBeenCalled(); 727 expect(values).toEqual([start(expect.any(Function))]); 728 }); 729 730 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 731 it('emits values from each flattened asynchronous source simultaneously', () => { 732 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 733 const fn = jest.fn(); 734 735 sinks.forEach(fn)( 736 operators.mergeMap((x: number) => { 737 return operators.delay(5)(sources.fromArray([x, x * 2])); 738 })(source) 739 ); 740 741 jest.runAllTimers(); 742 expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]); 743 }); 744 745 it('emits synchronous values in order', () => { 746 const values: any[] = []; 747 748 sinks.forEach(x => values.push(x))( 749 operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])]) 750 ); 751 752 expect(values).toEqual([1, 2, 3, 4]); 753 }); 754}); 755 756describe('onEnd', () => { 757 const noop = operators.onEnd(() => {}); 758 passesPassivePull(noop); 759 passesActivePush(noop); 760 passesSinkClose(noop); 761 passesSourceEnd(noop); 762 passesStrictEnd(noop); 763 passesSingleStart(noop); 764 passesAsyncSequence(noop); 765 766 it('calls a callback when the source ends', () => { 767 const { source, next, complete } = sources.makeSubject<any>(); 768 const fn = jest.fn(); 769 770 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 771 772 next(null); 773 expect(fn).not.toHaveBeenCalled(); 774 775 complete(); 776 expect(fn).toHaveBeenCalled(); 777 }); 778}); 779 780describe('onPush', () => { 781 const noop = operators.onPush(() => {}); 782 passesPassivePull(noop); 783 passesActivePush(noop); 784 passesSinkClose(noop); 785 passesSourceEnd(noop); 786 passesStrictEnd(noop); 787 passesSingleStart(noop); 788 passesAsyncSequence(noop); 789 790 it('calls a callback when the source emits', () => { 791 const { source, next } = sources.makeSubject<number>(); 792 const fn = jest.fn(); 793 794 sinks.forEach(() => {})(operators.onPush(fn)(source)); 795 796 next(1); 797 expect(fn).toHaveBeenCalledWith(1); 798 next(2); 799 expect(fn).toHaveBeenCalledWith(2); 800 }); 801 802 it('is the same as `tap`', () => { 803 expect(operators.onPush).toBe(operators.tap); 804 }); 805}); 806 807describe('onStart', () => { 808 const noop = operators.onStart(() => {}); 809 passesPassivePull(noop); 810 passesActivePush(noop); 811 passesSinkClose(noop); 812 passesSourceEnd(noop); 813 passesSingleStart(noop); 814 passesAsyncSequence(noop); 815 816 it('is called when the source starts', () => { 817 let sink: Sink<any>; 818 819 const fn = jest.fn(); 820 const source: Source<any> = _sink => { 821 sink = _sink; 822 }; 823 824 sinks.forEach(() => {})(operators.onStart(fn)(source)); 825 826 expect(fn).not.toHaveBeenCalled(); 827 828 sink!(start(() => {})); 829 expect(fn).toHaveBeenCalled(); 830 }); 831}); 832 833describe('sample', () => { 834 const valueThenNever: Source<any> = sink => 835 sink( 836 start(signal => { 837 if (signal === TalkbackKind.Pull) sink(push(null)); 838 }) 839 ); 840 841 const noop = operators.sample(valueThenNever); 842 843 passesPassivePull(noop); 844 passesActivePush(noop); 845 passesSinkClose(noop); 846 passesSourcePushThenEnd(noop); 847 passesSingleStart(noop); 848 passesStrictEnd(noop); 849 850 it('emits the latest value when a notifier source emits', () => { 851 const { source: notifier$, next: notify } = sources.makeSubject(); 852 const { source: input$, next } = sources.makeSubject(); 853 const fn = jest.fn(); 854 855 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 856 857 next(1); 858 next(2); 859 expect(fn).not.toHaveBeenCalled(); 860 861 notify(null); 862 expect(fn).toHaveBeenCalledWith(2); 863 }); 864}); 865 866describe('scan', () => { 867 const noop = operators.scan<any, any>((_acc, x) => x, null); 868 passesPassivePull(noop); 869 passesActivePush(noop); 870 passesSinkClose(noop); 871 passesSourceEnd(noop); 872 passesSingleStart(noop); 873 passesAsyncSequence(noop); 874 875 it('folds values continuously with a reducer and initial value', () => { 876 const { source: input$, next } = sources.makeSubject<number>(); 877 const fn = jest.fn(); 878 879 const reducer = (acc: number, x: number) => acc + x; 880 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 881 882 next(1); 883 expect(fn).toHaveBeenCalledWith(1); 884 next(2); 885 expect(fn).toHaveBeenCalledWith(3); 886 }); 887}); 888 889describe('share', () => { 890 const noop = operators.share; 891 passesPassivePull(noop); 892 passesActivePush(noop); 893 passesSinkClose(noop); 894 passesSourceEnd(noop); 895 passesSingleStart(noop); 896 passesStrictEnd(noop); 897 passesAsyncSequence(noop); 898 899 it('shares output values between sinks', () => { 900 let onPush = () => {}; 901 902 const source: Source<any> = operators.share(sink => { 903 sink(start(() => {})); 904 onPush = () => { 905 sink(push([0])); 906 sink(SignalKind.End); 907 }; 908 }); 909 910 const fnA = jest.fn(); 911 const fnB = jest.fn(); 912 913 sinks.forEach(fnA)(source); 914 sinks.forEach(fnB)(source); 915 onPush(); 916 917 expect(fnA).toHaveBeenCalledWith([0]); 918 expect(fnB).toHaveBeenCalledWith([0]); 919 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 920 }); 921}); 922 923describe('skip', () => { 924 const noop = operators.skip(0); 925 passesPassivePull(noop); 926 passesActivePush(noop); 927 passesSinkClose(noop); 928 passesSourceEnd(noop); 929 passesSingleStart(noop); 930 passesAsyncSequence(noop); 931 932 it('skips a number of values before emitting normally', () => { 933 const { source, next } = sources.makeSubject<number>(); 934 const fn = jest.fn(); 935 936 sinks.forEach(fn)(operators.skip(1)(source)); 937 938 next(1); 939 expect(fn).not.toHaveBeenCalled(); 940 next(2); 941 expect(fn).toHaveBeenCalledWith(2); 942 }); 943}); 944 945describe('skipUntil', () => { 946 const noop = operators.skipUntil(sources.fromValue(null)); 947 passesPassivePull(noop); 948 passesActivePush(noop); 949 passesSinkClose(noop); 950 passesSourceEnd(noop); 951 passesSingleStart(noop); 952 passesAsyncSequence(noop); 953 passesStrictEnd(noop); 954 955 it('skips values until the notifier source emits', () => { 956 const { source: notifier$, next: notify } = sources.makeSubject(); 957 const { source: input$, next } = sources.makeSubject<number>(); 958 const fn = jest.fn(); 959 960 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 961 962 next(1); 963 expect(fn).not.toHaveBeenCalled(); 964 notify(null); 965 next(2); 966 expect(fn).toHaveBeenCalledWith(2); 967 }); 968}); 969 970describe('skipWhile', () => { 971 const noop = operators.skipWhile(() => false); 972 passesPassivePull(noop); 973 passesActivePush(noop); 974 passesSinkClose(noop); 975 passesSourceEnd(noop); 976 passesSingleStart(noop); 977 passesAsyncSequence(noop); 978 979 it('skips values until one fails a predicate', () => { 980 const { source, next } = sources.makeSubject<number>(); 981 const fn = jest.fn(); 982 983 sinks.forEach(fn)(operators.skipWhile((x: any) => x <= 1)(source)); 984 985 next(1); 986 expect(fn).not.toHaveBeenCalled(); 987 next(2); 988 expect(fn).toHaveBeenCalledWith(2); 989 }); 990}); 991 992describe('switchMap', () => { 993 const noop = operators.switchMap(x => sources.fromValue(x)); 994 passesPassivePull(noop); 995 passesActivePush(noop); 996 passesSinkClose(noop); 997 passesSourcePushThenEnd(noop); 998 passesSingleStart(noop); 999 passesStrictEnd(noop); 1000 passesAsyncSequence(noop); 1001 1002 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 1003 it('emits values from each flattened synchronous source', () => { 1004 const { source, next, complete } = sources.makeSubject<number>(); 1005 const fn = jest.fn(); 1006 1007 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 1008 1009 next(1); 1010 next(3); 1011 complete(); 1012 1013 expect(fn).toHaveBeenCalledTimes(6); 1014 expect(fn.mock.calls).toEqual([ 1015 [start(expect.any(Function))], 1016 [push(1)], 1017 [push(2)], 1018 [push(3)], 1019 [push(4)], 1020 [SignalKind.End], 1021 ]); 1022 }); 1023 1024 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 1025 it('lets inner sources finish when outer source ends', () => { 1026 const signals: Signal<any>[] = []; 1027 const teardown = jest.fn(); 1028 const fn = (signal: Signal<any>) => { 1029 signals.push(signal); 1030 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 1031 signal[0](TalkbackKind.Pull); 1032 signal[0](TalkbackKind.Close); 1033 } 1034 }; 1035 1036 operators.switchMap(() => { 1037 return sources.make(() => teardown); 1038 })(sources.fromValue(null))(fn); 1039 1040 expect(teardown).toHaveBeenCalled(); 1041 expect(signals).toEqual([start(expect.any(Function))]); 1042 }); 1043 1044 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 1045 it('emits values from each flattened asynchronous source, one at a time', () => { 1046 const source = operators.delay<number>(4)(sources.fromArray([1, 10])); 1047 const fn = jest.fn(); 1048 1049 sinks.forEach(fn)( 1050 operators.switchMap((x: number) => 1051 operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5))) 1052 )(source) 1053 ); 1054 1055 jest.runAllTimers(); 1056 expect(fn.mock.calls).toEqual([[1], [10], [20]]); 1057 }); 1058}); 1059 1060describe('take', () => { 1061 const noop = operators.take(10); 1062 passesPassivePull(noop); 1063 passesActivePush(noop); 1064 passesSinkClose(noop); 1065 passesSourceEnd(noop); 1066 passesSingleStart(noop); 1067 passesStrictEnd(noop); 1068 passesAsyncSequence(noop); 1069 1070 passesCloseAndEnd(operators.take(0)); 1071 1072 it('emits values until a maximum is reached', () => { 1073 const { source, next } = sources.makeSubject<number>(); 1074 const fn = jest.fn(); 1075 1076 operators.take(1)(source)(fn); 1077 next(1); 1078 1079 expect(fn).toHaveBeenCalledTimes(3); 1080 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 1081 }); 1082}); 1083 1084describe('takeUntil', () => { 1085 const noop = operators.takeUntil(sources.never); 1086 passesPassivePull(noop); 1087 passesActivePush(noop); 1088 passesSinkClose(noop); 1089 passesSourcePushThenEnd(noop); 1090 passesSingleStart(noop); 1091 passesStrictEnd(noop); 1092 passesAsyncSequence(noop); 1093 1094 const ending = operators.takeUntil(sources.fromValue(null)); 1095 passesCloseAndEnd(ending); 1096 1097 it('emits values until a notifier emits', () => { 1098 const { source: notifier$, next: notify } = sources.makeSubject<any>(); 1099 const { source: input$, next } = sources.makeSubject<number>(); 1100 const fn = jest.fn(); 1101 1102 operators.takeUntil(notifier$)(input$)(fn); 1103 next(1); 1104 1105 expect(fn).toHaveBeenCalledTimes(2); 1106 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]); 1107 1108 notify(null); 1109 expect(fn).toHaveBeenCalledTimes(3); 1110 expect(fn.mock.calls[2][0]).toEqual(SignalKind.End); 1111 }); 1112}); 1113 1114describe('takeWhile', () => { 1115 const noop = operators.takeWhile(() => true); 1116 passesPassivePull(noop); 1117 passesActivePush(noop); 1118 passesSinkClose(noop); 1119 passesSourceEnd(noop); 1120 passesSingleStart(noop); 1121 passesAsyncSequence(noop); 1122 1123 const ending = operators.takeWhile(() => false); 1124 passesCloseAndEnd(ending); 1125 1126 it('emits values while a predicate passes for all values', () => { 1127 const { source, next } = sources.makeSubject<number>(); 1128 const fn = jest.fn(); 1129 1130 operators.takeWhile((x: any) => x < 2)(source)(fn); 1131 next(1); 1132 next(2); 1133 1134 expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]); 1135 }); 1136}); 1137 1138describe('takeLast', () => { 1139 passesCloseAndEnd(operators.takeLast(0)); 1140 1141 it('emits the last max values of an ended source', () => { 1142 const { source, next, complete } = sources.makeSubject<number>(); 1143 const signals: Signal<any>[] = []; 1144 1145 let talkback: TalkbackFn; 1146 1147 operators.takeLast(1)(source)(signal => { 1148 signals.push(signal); 1149 if (signal === SignalKind.End) { 1150 /*noop*/ 1151 } else if (signal.tag === SignalKind.Start) { 1152 (talkback = signal[0])(TalkbackKind.Pull); 1153 } else { 1154 talkback!(TalkbackKind.Pull); 1155 } 1156 }); 1157 1158 next(1); 1159 next(2); 1160 1161 expect(signals.length).toBe(0); 1162 complete(); 1163 1164 expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]); 1165 }); 1166}); 1167 1168describe('throttle', () => { 1169 const noop = operators.throttle(() => 0); 1170 passesPassivePull(noop); 1171 passesActivePush(noop); 1172 passesSinkClose(noop); 1173 passesSourceEnd(noop); 1174 passesSingleStart(noop); 1175 passesAsyncSequence(noop); 1176 1177 it('should ignore emissions for a period of time after a value', () => { 1178 const { source, next } = sources.makeSubject<number>(); 1179 const fn = jest.fn(); 1180 1181 sinks.forEach(fn)(operators.throttle(() => 100)(source)); 1182 1183 next(1); 1184 expect(fn).toHaveBeenCalledWith(1); 1185 jest.advanceTimersByTime(50); 1186 1187 next(2); 1188 expect(fn).toHaveBeenCalledTimes(1); 1189 jest.advanceTimersByTime(50); 1190 1191 next(3); 1192 expect(fn).toHaveBeenCalledWith(3); 1193 }); 1194});