Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import * as deriving from './helpers/wonka_deriving'; 2import * as sources from './wonka_sources.gen'; 3import * as sinks from './wonka_sinks.gen'; 4import * as operators from './wonka_operators.gen'; 5import * as web from './web/wonkaJs.gen'; 6import * as types from './wonka_types.gen'; 7 8/* This tests a noop operator for passive Pull talkback signals. 9 A Pull will be sent from the sink upwards and should pass through 10 the operator until the source receives it, which then pushes a 11 value down. */ 12const passesPassivePull = ( 13 operator: types.operatorT<any, any>, 14 output: any = 0 15) => 16 it('responds to Pull talkback signals (spec)', () => { 17 let talkback = null; 18 let push = 0; 19 const values = []; 20 21 const source: types.sourceT<any> = sink => { 22 sink(deriving.start(tb => { 23 if (!push && tb === deriving.pull) { 24 push++; 25 sink(deriving.push(0)); 26 } 27 })); 28 }; 29 30 const sink: types.sinkT<any> = signal => { 31 expect(deriving.isEnd(signal)).toBeFalsy(); 32 if (deriving.isPush(signal)) { 33 values.push(deriving.unboxPush(signal)); 34 } else if (deriving.isStart(signal)) { 35 talkback = deriving.unboxStart(signal); 36 } 37 }; 38 39 operator(source)(sink); 40 // The Start signal should always come in immediately 41 expect(talkback).not.toBe(null); 42 // No Push signals should be issued initially 43 expect(values).toEqual([]); 44 45 // When pulling a value we expect an immediate response 46 talkback(deriving.pull); 47 jest.runAllTimers(); 48 expect(values).toEqual([output]); 49 }); 50 51/* This tests a noop operator for regular, active Push signals. 52 A Push will be sent downwards from the source, through the 53 operator to the sink. Pull events should be let through from 54 the sink after every Push event. */ 55const passesActivePush = ( 56 operator: types.operatorT<any, any>, 57 result: any = 0 58) => 59 it('responds to eager Push signals (spec)', () => { 60 const values = []; 61 let talkback = null; 62 let push = null; 63 let pulls = 0; 64 65 const source: types.sourceT<any> = sink => { 66 push = (value: any) => sink(deriving.push(value)); 67 sink(deriving.start(tb => { 68 if (tb === deriving.pull) 69 pulls++; 70 })); 71 }; 72 73 const sink: types.sinkT<any> = signal => { 74 expect(deriving.isEnd(signal)).toBeFalsy(); 75 if (deriving.isStart(signal)) { 76 talkback = deriving.unboxStart(signal); 77 } else if (deriving.isPush(signal)) { 78 values.push(deriving.unboxPush(signal)); 79 talkback(deriving.pull); 80 } 81 }; 82 83 operator(source)(sink); 84 // No Pull signals should be issued initially 85 expect(pulls).toBe(0); 86 87 // When pushing a value we expect an immediate response 88 push(0); 89 jest.runAllTimers(); 90 expect(values).toEqual([result]); 91 // Subsequently the Pull signal should have travelled upwards 92 expect(pulls).toBe(1); 93 }); 94 95/* This tests a noop operator for Close talkback signals from the sink. 96 A Close signal will be sent, which should be forwarded to the source, 97 which then ends the communication without sending an End signal. */ 98const passesSinkClose = (operator: types.operatorT<any, any>) => 99 it('responds to Close signals from sink (spec)', () => { 100 let talkback = null; 101 let closing = 0; 102 103 const source: types.sourceT<any> = sink => { 104 sink(deriving.start(tb => { 105 if (tb === deriving.pull && !closing) { 106 sink(deriving.push(0)); 107 } else if (tb === deriving.close) { 108 closing++; 109 } 110 })); 111 }; 112 113 const sink: types.sinkT<any> = signal => { 114 expect(deriving.isEnd(signal)).toBeFalsy(); 115 if (deriving.isStart(signal)) { 116 talkback = deriving.unboxStart(signal); 117 } else if (deriving.isPush(signal)) { 118 talkback(deriving.close); 119 } 120 }; 121 122 operator(source)(sink); 123 124 // When pushing a value we expect an immediate close signal 125 talkback(deriving.pull); 126 jest.runAllTimers(); 127 expect(closing).toBe(1); 128 }); 129 130/* This tests a noop operator for End signals from the source. 131 A Push and End signal will be sent after the first Pull talkback 132 signal from the sink, which shouldn't lead to any extra Close or Pull 133 talkback signals. */ 134const passesSourceEnd = ( 135 operator: types.operatorT<any, any>, 136 result: any = 0 137) => 138 it('passes on immediate Push then End signals from source (spec)', () => { 139 const signals = []; 140 let talkback = null; 141 let pulls = 0; 142 let ending = 0; 143 144 const source: types.sourceT<any> = sink => { 145 sink(deriving.start(tb => { 146 expect(tb).not.toBe(deriving.close); 147 if (tb === deriving.pull) { 148 pulls++; 149 if (pulls === 1) { 150 sink(deriving.push(0)); 151 sink(deriving.end()); 152 } 153 } 154 })); 155 }; 156 157 const sink: types.sinkT<any> = signal => { 158 if (deriving.isStart(signal)) { 159 talkback = deriving.unboxStart(signal); 160 } else { 161 signals.push(signal); 162 if (deriving.isEnd(signal)) ending++; 163 } 164 }; 165 166 operator(source)(sink); 167 168 // When pushing a value we expect an immediate Push then End signal 169 talkback(deriving.pull); 170 jest.runAllTimers(); 171 expect(ending).toBe(1); 172 expect(signals).toEqual([deriving.push(result), deriving.end()]); 173 // Also no additional pull event should be created by the operator 174 expect(pulls).toBe(1); 175 }); 176 177/* This tests a noop operator for End signals from the source 178 after the first pull in response to another. 179 This is similar to passesSourceEnd but more well behaved since 180 mergeMap/switchMap/concatMap are eager operators. */ 181const passesSourcePushThenEnd = ( 182 operator: types.operatorT<any, any>, 183 result: any = 0 184) => 185 it('passes on End signals from source (spec)', () => { 186 const signals = []; 187 let talkback = null; 188 let pulls = 0; 189 let ending = 0; 190 191 const source: types.sourceT<any> = sink => { 192 sink(deriving.start(tb => { 193 expect(tb).not.toBe(deriving.close); 194 if (tb === deriving.pull) { 195 pulls++; 196 if (pulls <= 2) { sink(deriving.push(0)); } 197 else { sink(deriving.end()); } 198 } 199 })); 200 }; 201 202 const sink: types.sinkT<any> = signal => { 203 if (deriving.isStart(signal)) { 204 talkback = deriving.unboxStart(signal); 205 } else { 206 signals.push(signal); 207 if (deriving.isPush(signal)) talkback(deriving.pull); 208 if (deriving.isEnd(signal)) ending++; 209 } 210 }; 211 212 operator(source)(sink); 213 214 // When pushing a value we expect an immediate Push then End signal 215 talkback(deriving.pull); 216 jest.runAllTimers(); 217 expect(ending).toBe(1); 218 expect(pulls).toBe(3); 219 expect(signals).toEqual([ 220 deriving.push(result), 221 deriving.push(result), 222 deriving.end() 223 ]); 224 }); 225 226/* This tests a noop operator for Start signals from the source. 227 When the operator's sink is started by the source it'll receive 228 a Start event. As a response it should never send more than one 229 Start signals to the sink. */ 230const passesSingleStart = (operator: types.operatorT<any, any>) => 231 it('sends a single Start event to the incoming sink (spec)', () => { 232 let start = 0; 233 234 const source: types.sourceT<any> = sink => { 235 sink(deriving.start(() => {})); 236 }; 237 238 const sink: types.sinkT<any> = signal => { 239 if (deriving.isStart(signal)) start++; 240 }; 241 242 // When starting the operator we expect a single start event on the sink 243 operator(source)(sink); 244 expect(start).toBe(1); 245 }); 246 247/* This tests a noop operator for silence after End signals from the source. 248 When the operator receives the End signal it shouldn't forward any other 249 signals to the sink anymore. 250 This isn't a strict requirement, but some operators should ensure that 251 all sources are well behaved. This is particularly true for operators 252 that either Close sources themselves or may operate on multiple sources. */ 253const passesStrictEnd = (operator: types.operatorT<any, any>) => { 254 it('stops all signals after End has been received (spec: strict end)', () => { 255 let pulls = 0; 256 const signals = []; 257 258 const source: types.sourceT<any> = sink => { 259 sink(deriving.start(tb => { 260 if (tb === deriving.pull) { 261 pulls++; 262 sink(deriving.end()); 263 sink(deriving.push(123)); 264 } 265 })); 266 }; 267 268 const sink: types.sinkT<any> = signal => { 269 if (deriving.isStart(signal)) { 270 deriving.unboxStart(signal)(deriving.pull); 271 } else { 272 signals.push(signal); 273 } 274 }; 275 276 operator(source)(sink); 277 278 // The Push signal should've been dropped 279 jest.runAllTimers(); 280 expect(signals).toEqual([deriving.end()]); 281 expect(pulls).toBe(1); 282 }); 283 284 it('stops all signals after Close has been received (spec: strict close)', () => { 285 const signals = []; 286 287 const source: types.sourceT<any> = sink => { 288 sink(deriving.start(tb => { 289 if (tb === deriving.close) { 290 sink(deriving.push(123)); 291 } 292 })); 293 }; 294 295 const sink: types.sinkT<any> = signal => { 296 if (deriving.isStart(signal)) { 297 deriving.unboxStart(signal)(deriving.close); 298 } else { 299 signals.push(signal); 300 } 301 }; 302 303 operator(source)(sink); 304 305 // The Push signal should've been dropped 306 jest.runAllTimers(); 307 expect(signals).toEqual([]); 308 }); 309}; 310 311/* This tests an immediately closing operator for End signals to 312 the sink and Close signals to the source. 313 When an operator closes immediately we expect to see a Close 314 signal at the source and an End signal to the sink, since the 315 closing operator is expected to end the entire chain. */ 316const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) => 317 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 318 let closing = 0; 319 let ending = 0; 320 321 const source: types.sourceT<any> = sink => { 322 sink(deriving.start(tb => { 323 // For some operator tests we do need to send a single value 324 if (tb === deriving.pull) 325 sink(deriving.push(null)); 326 if (tb === deriving.close) 327 closing++; 328 })); 329 }; 330 331 const sink: types.sinkT<any> = signal => { 332 if (deriving.isStart(signal)) { 333 deriving.unboxStart(signal)(deriving.pull); 334 } if (deriving.isEnd(signal)) { 335 ending++; 336 } 337 }; 338 339 // We expect the operator to immediately end and close 340 closingOperator(source)(sink); 341 expect(closing).toBe(1); 342 expect(ending).toBe(1); 343 }); 344 345const passesAsyncSequence = ( 346 operator: types.operatorT<any, any>, 347 result: any = 0 348) => 349 it('passes an async push with an async end (spec)', () => { 350 let hasPushed = false; 351 const signals = []; 352 353 const source: types.sourceT<any> = sink => { 354 sink(deriving.start(tb => { 355 if (tb === deriving.pull && !hasPushed) { 356 hasPushed = true; 357 setTimeout(() => sink(deriving.push(0)), 10); 358 setTimeout(() => sink(deriving.end()), 20); 359 } 360 })); 361 }; 362 363 const sink: types.sinkT<any> = signal => { 364 if (deriving.isStart(signal)) { 365 setTimeout(() => { 366 deriving.unboxStart(signal)(deriving.pull); 367 }, 5); 368 } else { 369 signals.push(signal); 370 } 371 }; 372 373 // We initially expect to see the push signal 374 // Afterwards after all timers all other signals come in 375 operator(source)(sink); 376 expect(signals.length).toBe(0); 377 jest.advanceTimersByTime(5); 378 expect(hasPushed).toBeTruthy(); 379 jest.runAllTimers(); 380 381 expect(signals).toEqual([ 382 deriving.push(result), 383 deriving.end() 384 ]); 385 }); 386 387beforeEach(() => { 388 jest.useFakeTimers(); 389}); 390 391describe('combine', () => { 392 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source); 393 394 passesPassivePull(noop, [0, 0]); 395 passesActivePush(noop, [0, 0]); 396 passesSinkClose(noop); 397 passesSourceEnd(noop, [0, 0]); 398 passesSingleStart(noop); 399 passesStrictEnd(noop); 400 401 it('emits the zipped values of two sources', () => { 402 const { source: sourceA, next: nextA } = sources.makeSubject(); 403 const { source: sourceB, next: nextB } = sources.makeSubject(); 404 const fn = jest.fn(); 405 406 sinks.forEach(fn)(operators.combine(sourceA, sourceB)); 407 408 nextA(1); 409 expect(fn).not.toHaveBeenCalled(); 410 nextB(2); 411 expect(fn).toHaveBeenCalledWith([1, 2]); 412 }); 413}); 414 415describe('buffer', () => { 416 const valueThenNever: types.sourceT<any> = sink => 417 sink(deriving.start(tb => { 418 if (tb === deriving.pull) 419 sink(deriving.push(null)); 420 })); 421 422 const noop = operators.buffer(valueThenNever); 423 424 passesPassivePull(noop, [0]); 425 passesActivePush(noop, [0]); 426 passesSinkClose(noop); 427 passesSourcePushThenEnd(noop, [0]); 428 passesSingleStart(noop); 429 passesStrictEnd(noop); 430 431 it('emits batches of input values when a notifier emits', () => { 432 const { source: notifier$, next: notify } = sources.makeSubject(); 433 const { source: input$, next } = sources.makeSubject(); 434 const fn = jest.fn(); 435 436 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 437 438 next(1); 439 next(2); 440 expect(fn).not.toHaveBeenCalled(); 441 442 notify(null); 443 expect(fn).toHaveBeenCalledWith([1, 2]); 444 445 next(3); 446 notify(null); 447 expect(fn).toHaveBeenCalledWith([3]); 448 }); 449}); 450 451describe('concatMap', () => { 452 const noop = operators.concatMap(x => sources.fromValue(x)); 453 passesPassivePull(noop); 454 passesActivePush(noop); 455 passesSinkClose(noop); 456 passesSourcePushThenEnd(noop); 457 passesSingleStart(noop); 458 passesStrictEnd(noop); 459 passesAsyncSequence(noop); 460 461 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 462 it('emits values from each flattened synchronous source', () => { 463 const { source, next, complete } = sources.makeSubject<number>(); 464 const fn = jest.fn(); 465 466 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 467 468 next(1); 469 next(3); 470 complete(); 471 472 expect(fn).toHaveBeenCalledTimes(6); 473 expect(fn.mock.calls).toEqual([ 474 [deriving.start(expect.any(Function))], 475 [deriving.push(1)], 476 [deriving.push(2)], 477 [deriving.push(3)], 478 [deriving.push(4)], 479 [deriving.end()], 480 ]); 481 }); 482 483 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 484 it('lets inner sources finish when outer source ends', () => { 485 const values = []; 486 const teardown = jest.fn(); 487 const fn = (signal: types.signalT<any>) => { 488 values.push(signal); 489 if (deriving.isStart(signal)) { 490 deriving.unboxStart(signal)(deriving.pull); 491 deriving.unboxStart(signal)(deriving.close); 492 } 493 }; 494 495 operators.concatMap(() => { 496 return sources.make(() => teardown); 497 })(sources.fromValue(null))(fn); 498 499 expect(teardown).toHaveBeenCalled(); 500 expect(values).toEqual([ 501 deriving.start(expect.any(Function)), 502 ]); 503 }); 504 505 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 506 it('emits values from each flattened asynchronous source, one at a time', () => { 507 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 508 const fn = jest.fn(); 509 510 sinks.forEach(fn)( 511 operators.concatMap((x: number) => { 512 return web.delay(5)(sources.fromArray([x, x * 2])); 513 })(source) 514 ); 515 516 jest.advanceTimersByTime(14); 517 expect(fn.mock.calls).toEqual([ 518 [1], 519 [2], 520 ]); 521 522 jest.runAllTimers(); 523 expect(fn.mock.calls).toEqual([ 524 [1], 525 [2], 526 [10], 527 [20], 528 ]); 529 }); 530 531 it('works for fully asynchronous sources', () => { 532 const fn = jest.fn(); 533 534 sinks.forEach(fn)( 535 operators.concatMap(() => { 536 return sources.make(observer => { 537 setTimeout(() => observer.next(1)); 538 return () => {}; 539 }) 540 })(sources.fromValue(null)) 541 ); 542 543 jest.runAllTimers(); 544 expect(fn).toHaveBeenCalledWith(1); 545 }); 546 547 it('emits synchronous values in order', () => { 548 const values = []; 549 550 sinks.forEach(x => values.push(x))( 551 operators.concat([ 552 sources.fromArray([1, 2]), 553 sources.fromArray([3, 4]) 554 ]) 555 ); 556 557 expect(values).toEqual([ 1, 2, 3, 4 ]); 558 }); 559}); 560 561describe('debounce', () => { 562 const noop = web.debounce(() => 0); 563 passesPassivePull(noop); 564 passesActivePush(noop); 565 passesSinkClose(noop); 566 passesSourceEnd(noop); 567 passesSingleStart(noop); 568 passesStrictEnd(noop); 569 passesAsyncSequence(noop); 570 571 it('waits for a specified amount of silence before emitting the last value', () => { 572 const { source, next } = sources.makeSubject<number>(); 573 const fn = jest.fn(); 574 575 sinks.forEach(fn)(web.debounce(() => 100)(source)); 576 577 next(1); 578 jest.advanceTimersByTime(50); 579 expect(fn).not.toHaveBeenCalled(); 580 581 next(2); 582 jest.advanceTimersByTime(99); 583 expect(fn).not.toHaveBeenCalled(); 584 585 jest.advanceTimersByTime(1); 586 expect(fn).toHaveBeenCalledWith(2); 587 }); 588 589 it('emits debounced value with delayed End signal', () => { 590 const { source, next, complete } = sources.makeSubject<number>(); 591 const fn = jest.fn(); 592 593 sinks.forEach(fn)(web.debounce(() => 100)(source)); 594 595 next(1); 596 complete(); 597 jest.advanceTimersByTime(100); 598 expect(fn).toHaveBeenCalled(); 599 }); 600}); 601 602describe('delay', () => { 603 const noop = web.delay(0); 604 passesPassivePull(noop); 605 passesActivePush(noop); 606 passesSinkClose(noop); 607 passesSourceEnd(noop); 608 passesSingleStart(noop); 609 passesAsyncSequence(noop); 610 611 it('delays outputs by a specified delay timeout value', () => { 612 const { source, next } = sources.makeSubject(); 613 const fn = jest.fn(); 614 615 sinks.forEach(fn)(web.delay(100)(source)); 616 617 next(1); 618 expect(fn).not.toHaveBeenCalled(); 619 620 jest.advanceTimersByTime(100); 621 expect(fn).toHaveBeenCalledWith(1); 622 }); 623}); 624 625describe('filter', () => { 626 const noop = operators.filter(() => true); 627 passesPassivePull(noop); 628 passesActivePush(noop); 629 passesSinkClose(noop); 630 passesSourceEnd(noop); 631 passesSingleStart(noop); 632 passesAsyncSequence(noop); 633 634 it('prevents emissions for which a predicate fails', () => { 635 const { source, next } = sources.makeSubject(); 636 const fn = jest.fn(); 637 638 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 639 640 next(false); 641 expect(fn).not.toHaveBeenCalled(); 642 643 next(true); 644 expect(fn).toHaveBeenCalledWith(true); 645 }); 646}); 647 648describe('map', () => { 649 const noop = operators.map(x => x); 650 passesPassivePull(noop); 651 passesActivePush(noop); 652 passesSinkClose(noop); 653 passesSourceEnd(noop); 654 passesSingleStart(noop); 655 passesAsyncSequence(noop); 656 657 it('maps over values given a transform function', () => { 658 const { source, next } = sources.makeSubject<number>(); 659 const fn = jest.fn(); 660 661 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 662 663 next(1); 664 expect(fn).toHaveBeenCalledWith(2); 665 }); 666}); 667 668describe('mergeMap', () => { 669 const noop = operators.mergeMap(x => sources.fromValue(x)); 670 passesPassivePull(noop); 671 passesActivePush(noop); 672 passesSinkClose(noop); 673 passesSourcePushThenEnd(noop); 674 passesSingleStart(noop); 675 passesStrictEnd(noop); 676 passesAsyncSequence(noop); 677 678 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 679 it('emits values from each flattened synchronous source', () => { 680 const { source, next, complete } = sources.makeSubject<number>(); 681 const fn = jest.fn(); 682 683 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 684 685 next(1); 686 next(3); 687 complete(); 688 689 expect(fn.mock.calls).toEqual([ 690 [deriving.start(expect.any(Function))], 691 [deriving.push(1)], 692 [deriving.push(2)], 693 [deriving.push(3)], 694 [deriving.push(4)], 695 [deriving.end()], 696 ]); 697 }); 698 699 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 700 it('lets inner sources finish when outer source ends', () => { 701 const values = []; 702 const teardown = jest.fn(); 703 const fn = (signal: types.signalT<any>) => { 704 values.push(signal); 705 if (deriving.isStart(signal)) { 706 deriving.unboxStart(signal)(deriving.pull); 707 deriving.unboxStart(signal)(deriving.close); 708 } 709 }; 710 711 operators.mergeMap(() => { 712 return sources.make(() => teardown); 713 })(sources.fromValue(null))(fn); 714 715 expect(teardown).toHaveBeenCalled(); 716 expect(values).toEqual([ 717 deriving.start(expect.any(Function)), 718 ]); 719 }); 720 721 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 722 it('emits values from each flattened asynchronous source simultaneously', () => { 723 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 724 const fn = jest.fn(); 725 726 sinks.forEach(fn)( 727 operators.mergeMap((x: number) => { 728 return web.delay(5)(sources.fromArray([x, x * 2])); 729 })(source) 730 ); 731 732 jest.runAllTimers(); 733 expect(fn.mock.calls).toEqual([ 734 [1], 735 [10], 736 [2], 737 [20], 738 ]); 739 }); 740 741 it('emits synchronous values in order', () => { 742 const values = []; 743 744 sinks.forEach(x => values.push(x))( 745 operators.merge([ 746 sources.fromArray([1, 2]), 747 sources.fromArray([3, 4]) 748 ]) 749 ); 750 751 expect(values).toEqual([ 1, 2, 3, 4 ]); 752 }); 753}); 754 755describe('onEnd', () => { 756 const noop = operators.onEnd(() => {}); 757 passesPassivePull(noop); 758 passesActivePush(noop); 759 passesSinkClose(noop); 760 passesSourceEnd(noop); 761 passesSingleStart(noop); 762 passesAsyncSequence(noop); 763 764 it('calls a callback when the source ends', () => { 765 const { source, next, complete } = sources.makeSubject<number>(); 766 const fn = jest.fn(); 767 768 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 769 770 next(null); 771 expect(fn).not.toHaveBeenCalled(); 772 773 complete(); 774 expect(fn).toHaveBeenCalled(); 775 }); 776}); 777 778describe('onPush', () => { 779 const noop = operators.onPush(() => {}); 780 passesPassivePull(noop); 781 passesActivePush(noop); 782 passesSinkClose(noop); 783 passesSourceEnd(noop); 784 passesSingleStart(noop); 785 passesAsyncSequence(noop); 786 787 it('calls a callback when the source emits', () => { 788 const { source, next } = sources.makeSubject<number>(); 789 const fn = jest.fn(); 790 791 sinks.forEach(() => {})(operators.onPush(fn)(source)); 792 793 next(1); 794 expect(fn).toHaveBeenCalledWith(1); 795 next(2); 796 expect(fn).toHaveBeenCalledWith(2); 797 }); 798 799 it('is the same as `tap`', () => { 800 expect(operators.onPush).toBe(operators.tap); 801 }); 802}); 803 804describe('onStart', () => { 805 const noop = operators.onStart(() => {}); 806 passesPassivePull(noop); 807 passesActivePush(noop); 808 passesSinkClose(noop); 809 passesSourceEnd(noop); 810 passesSingleStart(noop); 811 passesAsyncSequence(noop); 812 813 it('is called when the source starts', () => { 814 let sink: types.sinkT<any>; 815 816 const fn = jest.fn(); 817 const source: types.sourceT<any> = _sink => { sink = _sink; }; 818 819 sinks.forEach(() => {})(operators.onStart(fn)(source)); 820 821 expect(fn).not.toHaveBeenCalled(); 822 823 sink(deriving.start(() => {})); 824 expect(fn).toHaveBeenCalled(); 825 }); 826}); 827 828describe('sample', () => { 829 const valueThenNever: types.sourceT<any> = sink => 830 sink(deriving.start(tb => { 831 if (tb === deriving.pull) 832 sink(deriving.push(null)); 833 })); 834 835 const noop = operators.sample(valueThenNever); 836 837 passesPassivePull(noop); 838 passesActivePush(noop); 839 passesSinkClose(noop); 840 passesSourcePushThenEnd(noop); 841 passesSingleStart(noop); 842 passesStrictEnd(noop); 843 844 it('emits the latest value when a notifier source emits', () => { 845 const { source: notifier$, next: notify } = sources.makeSubject(); 846 const { source: input$, next } = sources.makeSubject(); 847 const fn = jest.fn(); 848 849 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 850 851 next(1); 852 next(2); 853 expect(fn).not.toHaveBeenCalled(); 854 855 notify(null); 856 expect(fn).toHaveBeenCalledWith(2); 857 }); 858}); 859 860describe('scan', () => { 861 const noop = operators.scan((_acc, x) => x, null); 862 passesPassivePull(noop); 863 passesActivePush(noop); 864 passesSinkClose(noop); 865 passesSourceEnd(noop); 866 passesSingleStart(noop); 867 passesAsyncSequence(noop); 868 869 it('folds values continuously with a reducer and initial value', () => { 870 const { source: input$, next } = sources.makeSubject<number>(); 871 const fn = jest.fn(); 872 873 const reducer = (acc: number, x: number) => acc + x; 874 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 875 876 next(1); 877 expect(fn).toHaveBeenCalledWith(1); 878 next(2); 879 expect(fn).toHaveBeenCalledWith(3); 880 }); 881}); 882 883describe('share', () => { 884 const noop = operators.share; 885 passesPassivePull(noop); 886 passesActivePush(noop); 887 passesSinkClose(noop); 888 passesSourceEnd(noop); 889 passesSingleStart(noop); 890 passesStrictEnd(noop); 891 passesAsyncSequence(noop); 892 893 it('shares output values between sinks', () => { 894 let push = () => {}; 895 896 const source: types.sourceT<any> = operators.share(sink => { 897 sink(deriving.start(() => {})); 898 push = () => { 899 sink(deriving.push([0])); 900 sink(deriving.end()); 901 }; 902 }); 903 904 const fnA = jest.fn(); 905 const fnB = jest.fn(); 906 907 sinks.forEach(fnA)(source); 908 sinks.forEach(fnB)(source); 909 push(); 910 911 expect(fnA).toHaveBeenCalledWith([0]); 912 expect(fnB).toHaveBeenCalledWith([0]); 913 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 914 }); 915}); 916 917describe('skip', () => { 918 const noop = operators.skip(0); 919 passesPassivePull(noop); 920 passesActivePush(noop); 921 passesSinkClose(noop); 922 passesSourceEnd(noop); 923 passesSingleStart(noop); 924 passesAsyncSequence(noop); 925 926 it('skips a number of values before emitting normally', () => { 927 const { source, next } = sources.makeSubject<number>(); 928 const fn = jest.fn(); 929 930 sinks.forEach(fn)(operators.skip(1)(source)); 931 932 next(1); 933 expect(fn).not.toHaveBeenCalled(); 934 next(2); 935 expect(fn).toHaveBeenCalledWith(2); 936 }); 937}); 938 939describe('skipUntil', () => { 940 const noop = operators.skipUntil(sources.fromValue(null)); 941 passesPassivePull(noop); 942 passesActivePush(noop); 943 passesSinkClose(noop); 944 passesSourceEnd(noop); 945 passesSingleStart(noop); 946 passesAsyncSequence(noop); 947 passesStrictEnd(noop); 948 949 it('skips values until the notifier source emits', () => { 950 const { source: notifier$, next: notify } = sources.makeSubject(); 951 const { source: input$, next } = sources.makeSubject<number>(); 952 const fn = jest.fn(); 953 954 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 955 956 next(1); 957 expect(fn).not.toHaveBeenCalled(); 958 notify(null); 959 next(2); 960 expect(fn).toHaveBeenCalledWith(2); 961 }); 962}); 963 964describe('skipWhile', () => { 965 const noop = operators.skipWhile(() => false); 966 passesPassivePull(noop); 967 passesActivePush(noop); 968 passesSinkClose(noop); 969 passesSourceEnd(noop); 970 passesSingleStart(noop); 971 passesAsyncSequence(noop); 972 973 it('skips values until one fails a predicate', () => { 974 const { source, next } = sources.makeSubject<number>(); 975 const fn = jest.fn(); 976 977 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source)); 978 979 next(1); 980 expect(fn).not.toHaveBeenCalled(); 981 next(2); 982 expect(fn).toHaveBeenCalledWith(2); 983 }); 984}); 985 986describe('switchMap', () => { 987 const noop = operators.switchMap(x => sources.fromValue(x)); 988 passesPassivePull(noop); 989 passesActivePush(noop); 990 passesSinkClose(noop); 991 passesSourcePushThenEnd(noop); 992 passesSingleStart(noop); 993 passesStrictEnd(noop); 994 passesAsyncSequence(noop); 995 996 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 997 it('emits values from each flattened synchronous source', () => { 998 const { source, next, complete } = sources.makeSubject<number>(); 999 const fn = jest.fn(); 1000 1001 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 1002 1003 next(1); 1004 next(3); 1005 complete(); 1006 1007 expect(fn).toHaveBeenCalledTimes(6); 1008 expect(fn.mock.calls).toEqual([ 1009 [deriving.start(expect.any(Function))], 1010 [deriving.push(1)], 1011 [deriving.push(2)], 1012 [deriving.push(3)], 1013 [deriving.push(4)], 1014 [deriving.end()], 1015 ]); 1016 }); 1017 1018 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 1019 it('lets inner sources finish when outer source ends', () => { 1020 const values = []; 1021 const teardown = jest.fn(); 1022 const fn = (signal: types.signalT<any>) => { 1023 values.push(signal); 1024 if (deriving.isStart(signal)) { 1025 deriving.unboxStart(signal)(deriving.pull); 1026 deriving.unboxStart(signal)(deriving.close); 1027 } 1028 }; 1029 1030 operators.switchMap(() => { 1031 return sources.make(() => teardown); 1032 })(sources.fromValue(null))(fn); 1033 1034 expect(teardown).toHaveBeenCalled(); 1035 expect(values).toEqual([ 1036 deriving.start(expect.any(Function)), 1037 ]); 1038 }); 1039 1040 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 1041 it('emits values from each flattened asynchronous source, one at a time', () => { 1042 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 1043 const fn = jest.fn(); 1044 1045 sinks.forEach(fn)( 1046 operators.switchMap((x: number) => ( 1047 operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5))) 1048 ))(source) 1049 ); 1050 1051 jest.runAllTimers(); 1052 expect(fn.mock.calls).toEqual([ 1053 [1], 1054 [10], 1055 [20], 1056 ]); 1057 }); 1058}); 1059 1060describe('take', () => { 1061 const noop = operators.take(10); 1062 passesPassivePull(noop); 1063 passesActivePush(noop); 1064 passesSinkClose(noop); 1065 passesSourceEnd(noop); 1066 passesSingleStart(noop); 1067 passesStrictEnd(noop); 1068 passesAsyncSequence(noop); 1069 1070 passesCloseAndEnd(operators.take(0)); 1071 1072 it('emits values until a maximum is reached', () => { 1073 const { source, next } = sources.makeSubject<number>(); 1074 const fn = jest.fn(); 1075 1076 operators.take(1)(source)(fn); 1077 next(1); 1078 1079 expect(fn).toHaveBeenCalledTimes(3); 1080 expect(fn.mock.calls).toEqual([ 1081 [deriving.start(expect.any(Function))], 1082 [deriving.push(1)], 1083 [deriving.end()], 1084 ]); 1085 }); 1086}); 1087 1088describe('takeUntil', () => { 1089 const noop = operators.takeUntil(sources.never); 1090 passesPassivePull(noop); 1091 passesActivePush(noop); 1092 passesSinkClose(noop); 1093 passesSourcePushThenEnd(noop); 1094 passesSingleStart(noop); 1095 passesStrictEnd(noop); 1096 passesAsyncSequence(noop); 1097 1098 const ending = operators.takeUntil(sources.fromValue(null)); 1099 passesCloseAndEnd(ending); 1100 1101 it('emits values until a notifier emits', () => { 1102 const { source: notifier$, next: notify } = sources.makeSubject<number>(); 1103 const { source: input$, next } = sources.makeSubject<number>(); 1104 const fn = jest.fn(); 1105 1106 operators.takeUntil(notifier$)(input$)(fn); 1107 next(1); 1108 1109 expect(fn).toHaveBeenCalledTimes(2); 1110 expect(fn.mock.calls).toEqual([ 1111 [deriving.start(expect.any(Function))], 1112 [deriving.push(1)], 1113 ]); 1114 1115 notify(null); 1116 expect(fn).toHaveBeenCalledTimes(3); 1117 expect(fn.mock.calls[2][0]).toEqual(deriving.end()); 1118 }); 1119}); 1120 1121describe('takeWhile', () => { 1122 const noop = operators.takeWhile(() => true); 1123 passesPassivePull(noop); 1124 passesActivePush(noop); 1125 passesSinkClose(noop); 1126 passesSourceEnd(noop); 1127 passesSingleStart(noop); 1128 passesAsyncSequence(noop); 1129 1130 const ending = operators.takeWhile(() => false); 1131 passesCloseAndEnd(ending); 1132 1133 it('emits values while a predicate passes for all values', () => { 1134 const { source, next } = sources.makeSubject<number>(); 1135 const fn = jest.fn(); 1136 1137 operators.takeWhile(x => x < 2)(source)(fn); 1138 next(1); 1139 next(2); 1140 1141 expect(fn.mock.calls).toEqual([ 1142 [deriving.start(expect.any(Function))], 1143 [deriving.push(1)], 1144 [deriving.end()], 1145 ]); 1146 }); 1147}); 1148 1149describe('takeLast', () => { 1150 passesCloseAndEnd(operators.takeLast(0)); 1151 1152 it('emits the last max values of an ended source', () => { 1153 const { source, next, complete } = sources.makeSubject<number>(); 1154 const values = []; 1155 1156 let talkback; 1157 operators.takeLast(1)(source)(signal => { 1158 values.push(signal); 1159 if (deriving.isStart(signal)) 1160 talkback = deriving.unboxStart(signal); 1161 if (!deriving.isEnd(signal)) 1162 talkback(deriving.pull); 1163 }); 1164 1165 next(1); 1166 next(2); 1167 1168 expect(values.length).toBe(0); 1169 complete(); 1170 1171 expect(values).toEqual([ 1172 deriving.start(expect.any(Function)), 1173 deriving.push(2), 1174 deriving.end(), 1175 ]); 1176 }); 1177}); 1178 1179describe('throttle', () => { 1180 const noop = web.throttle(() => 0); 1181 passesPassivePull(noop); 1182 passesActivePush(noop); 1183 passesSinkClose(noop); 1184 passesSourceEnd(noop); 1185 passesSingleStart(noop); 1186 passesAsyncSequence(noop); 1187 1188 it('should ignore emissions for a period of time after a value', () => { 1189 const { source, next } = sources.makeSubject<number>(); 1190 const fn = jest.fn(); 1191 1192 sinks.forEach(fn)(web.throttle(() => 100)(source)); 1193 1194 next(1); 1195 expect(fn).toHaveBeenCalledWith(1); 1196 jest.advanceTimersByTime(50); 1197 1198 next(2); 1199 expect(fn).toHaveBeenCalledTimes(1); 1200 jest.advanceTimersByTime(50); 1201 1202 next(3); 1203 expect(fn).toHaveBeenCalledWith(3); 1204 }); 1205});