Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.3 33 kB view raw
1import * as deriving from './helpers/Wonka_deriving'; 2import * as sources from './Wonka_sources.gen'; 3import * as sinks from './Wonka_sinks.gen'; 4import * as operators from './Wonka_operators.gen'; 5import * as web from './web/WonkaJs.gen'; 6import * as types from './Wonka_types.gen'; 7 8/* This tests a noop operator for passive Pull talkback signals. 9 A Pull will be sent from the sink upwards and should pass through 10 the operator until the source receives it, which then pushes a 11 value down. */ 12const passesPassivePull = ( 13 operator: types.operatorT<any, any>, 14 output: any = 0 15) => 16 it('responds to Pull talkback signals (spec)', () => { 17 let talkback = null; 18 let push = 0; 19 const values = []; 20 21 const source: types.sourceT<any> = sink => { 22 sink(deriving.start(tb => { 23 if (!push && tb === deriving.pull) { 24 push++; 25 sink(deriving.push(0)); 26 } 27 })); 28 }; 29 30 const sink: types.sinkT<any> = signal => { 31 expect(deriving.isEnd(signal)).toBeFalsy(); 32 if (deriving.isPush(signal)) { 33 values.push(deriving.unboxPush(signal)); 34 } else if (deriving.isStart(signal)) { 35 talkback = deriving.unboxStart(signal); 36 } 37 }; 38 39 operator(source)(sink); 40 // The Start signal should always come in immediately 41 expect(talkback).not.toBe(null); 42 // No Push signals should be issued initially 43 expect(values).toEqual([]); 44 45 // When pulling a value we expect an immediate response 46 talkback(deriving.pull); 47 jest.runAllTimers(); 48 expect(values).toEqual([output]); 49 }); 50 51/* This tests a noop operator for regular, active Push signals. 52 A Push will be sent downwards from the source, through the 53 operator to the sink. Pull events should be let through from 54 the sink after every Push event. */ 55const passesActivePush = ( 56 operator: types.operatorT<any, any>, 57 result: any = 0 58) => 59 it('responds to eager Push signals (spec)', () => { 60 const values = []; 61 let talkback = null; 62 let push = null; 63 let pulls = 0; 64 65 const source: types.sourceT<any> = sink => { 66 push = (value: any) => sink(deriving.push(value)); 67 sink(deriving.start(tb => { 68 if (tb === deriving.pull) 69 pulls++; 70 })); 71 }; 72 73 const sink: types.sinkT<any> = signal => { 74 expect(deriving.isEnd(signal)).toBeFalsy(); 75 if (deriving.isStart(signal)) { 76 talkback = deriving.unboxStart(signal); 77 } else if (deriving.isPush(signal)) { 78 values.push(deriving.unboxPush(signal)); 79 talkback(deriving.pull); 80 } 81 }; 82 83 operator(source)(sink); 84 // No Pull signals should be issued initially 85 expect(pulls).toBe(0); 86 87 // When pushing a value we expect an immediate response 88 push(0); 89 jest.runAllTimers(); 90 expect(values).toEqual([result]); 91 // Subsequently the Pull signal should have travelled upwards 92 expect(pulls).toBe(1); 93 }); 94 95/* This tests a noop operator for Close talkback signals from the sink. 96 A Close signal will be sent, which should be forwarded to the source, 97 which then ends the communication without sending an End signal. */ 98const passesSinkClose = (operator: types.operatorT<any, any>) => 99 it('responds to Close signals from sink (spec)', () => { 100 let talkback = null; 101 let closing = 0; 102 103 const source: types.sourceT<any> = sink => { 104 sink(deriving.start(tb => { 105 if (tb === deriving.pull && !closing) { 106 sink(deriving.push(0)); 107 } else if (tb === deriving.close) { 108 closing++; 109 } 110 })); 111 }; 112 113 const sink: types.sinkT<any> = signal => { 114 expect(deriving.isEnd(signal)).toBeFalsy(); 115 if (deriving.isStart(signal)) { 116 talkback = deriving.unboxStart(signal); 117 } else if (deriving.isPush(signal)) { 118 talkback(deriving.close); 119 } 120 }; 121 122 operator(source)(sink); 123 124 // When pushing a value we expect an immediate close signal 125 talkback(deriving.pull); 126 jest.runAllTimers(); 127 expect(closing).toBe(1); 128 }); 129 130/* This tests a noop operator for End signals from the source. 131 A Push and End signal will be sent after the first Pull talkback 132 signal from the sink, which shouldn't lead to any extra Close or Pull 133 talkback signals. */ 134const passesSourceEnd = ( 135 operator: types.operatorT<any, any>, 136 result: any = 0 137) => 138 it('passes on immediate Push then End signals from source (spec)', () => { 139 const signals = []; 140 let talkback = null; 141 let pulls = 0; 142 let ending = 0; 143 144 const source: types.sourceT<any> = sink => { 145 sink(deriving.start(tb => { 146 expect(tb).not.toBe(deriving.close); 147 if (tb === deriving.pull) { 148 pulls++; 149 if (pulls === 1) { 150 sink(deriving.push(0)); 151 sink(deriving.end()); 152 } 153 } 154 })); 155 }; 156 157 const sink: types.sinkT<any> = signal => { 158 if (deriving.isStart(signal)) { 159 talkback = deriving.unboxStart(signal); 160 } else { 161 signals.push(signal); 162 if (deriving.isEnd(signal)) ending++; 163 } 164 }; 165 166 operator(source)(sink); 167 168 // When pushing a value we expect an immediate Push then End signal 169 talkback(deriving.pull); 170 jest.runAllTimers(); 171 expect(ending).toBe(1); 172 expect(signals).toEqual([deriving.push(result), deriving.end()]); 173 // Also no additional pull event should be created by the operator 174 expect(pulls).toBe(1); 175 }); 176 177/* This tests a noop operator for End signals from the source 178 after the first pull in response to another. 179 This is similar to passesSourceEnd but more well behaved since 180 mergeMap/switchMap/concatMap are eager operators. */ 181const passesSourcePushThenEnd = ( 182 operator: types.operatorT<any, any>, 183 result: any = 0 184) => 185 it('passes on End signals from source (spec)', () => { 186 const signals = []; 187 let talkback = null; 188 let pulls = 0; 189 let ending = 0; 190 191 const source: types.sourceT<any> = sink => { 192 sink(deriving.start(tb => { 193 expect(tb).not.toBe(deriving.close); 194 if (tb === deriving.pull) { 195 pulls++; 196 if (pulls <= 2) { sink(deriving.push(0)); } 197 else { sink(deriving.end()); } 198 } 199 })); 200 }; 201 202 const sink: types.sinkT<any> = signal => { 203 if (deriving.isStart(signal)) { 204 talkback = deriving.unboxStart(signal); 205 } else { 206 signals.push(signal); 207 if (deriving.isPush(signal)) talkback(deriving.pull); 208 if (deriving.isEnd(signal)) ending++; 209 } 210 }; 211 212 operator(source)(sink); 213 214 // When pushing a value we expect an immediate Push then End signal 215 talkback(deriving.pull); 216 jest.runAllTimers(); 217 expect(ending).toBe(1); 218 expect(pulls).toBe(3); 219 expect(signals).toEqual([ 220 deriving.push(result), 221 deriving.push(result), 222 deriving.end() 223 ]); 224 }); 225 226/* This tests a noop operator for Start signals from the source. 227 When the operator's sink is started by the source it'll receive 228 a Start event. As a response it should never send more than one 229 Start signals to the sink. */ 230const passesSingleStart = (operator: types.operatorT<any, any>) => 231 it('sends a single Start event to the incoming sink (spec)', () => { 232 let start = 0; 233 234 const source: types.sourceT<any> = sink => { 235 sink(deriving.start(() => {})); 236 }; 237 238 const sink: types.sinkT<any> = signal => { 239 if (deriving.isStart(signal)) start++; 240 }; 241 242 // When starting the operator we expect a single start event on the sink 243 operator(source)(sink); 244 expect(start).toBe(1); 245 }); 246 247/* This tests a noop operator for silence after End signals from the source. 248 When the operator receives the End signal it shouldn't forward any other 249 signals to the sink anymore. 250 This isn't a strict requirement, but some operators should ensure that 251 all sources are well behaved. This is particularly true for operators 252 that either Close sources themselves or may operate on multiple sources. */ 253const passesStrictEnd = (operator: types.operatorT<any, any>) => { 254 it('stops all signals after End has been received (spec: strict end)', () => { 255 let pulls = 0; 256 const signals = []; 257 258 const source: types.sourceT<any> = sink => { 259 sink(deriving.start(tb => { 260 if (tb === deriving.pull) { 261 pulls++; 262 sink(deriving.end()); 263 sink(deriving.push(123)); 264 } 265 })); 266 }; 267 268 const sink: types.sinkT<any> = signal => { 269 if (deriving.isStart(signal)) { 270 deriving.unboxStart(signal)(deriving.pull); 271 } else { 272 signals.push(signal); 273 } 274 }; 275 276 operator(source)(sink); 277 278 // The Push signal should've been dropped 279 jest.runAllTimers(); 280 expect(signals).toEqual([deriving.end()]); 281 expect(pulls).toBe(1); 282 }); 283 284 it('stops all signals after Close has been received (spec: strict close)', () => { 285 const signals = []; 286 287 const source: types.sourceT<any> = sink => { 288 sink(deriving.start(tb => { 289 if (tb === deriving.close) { 290 sink(deriving.push(123)); 291 } 292 })); 293 }; 294 295 const sink: types.sinkT<any> = signal => { 296 if (deriving.isStart(signal)) { 297 deriving.unboxStart(signal)(deriving.close); 298 } else { 299 signals.push(signal); 300 } 301 }; 302 303 operator(source)(sink); 304 305 // The Push signal should've been dropped 306 jest.runAllTimers(); 307 expect(signals).toEqual([]); 308 }); 309}; 310 311/* This tests an immediately closing operator for End signals to 312 the sink and Close signals to the source. 313 When an operator closes immediately we expect to see a Close 314 signal at the source and an End signal to the sink, since the 315 closing operator is expected to end the entire chain. */ 316const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) => 317 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 318 let closing = 0; 319 let ending = 0; 320 321 const source: types.sourceT<any> = sink => { 322 sink(deriving.start(tb => { 323 // For some operator tests we do need to send a single value 324 if (tb === deriving.pull) 325 sink(deriving.push(null)); 326 if (tb === deriving.close) 327 closing++; 328 })); 329 }; 330 331 const sink: types.sinkT<any> = signal => { 332 if (deriving.isStart(signal)) { 333 deriving.unboxStart(signal)(deriving.pull); 334 } if (deriving.isEnd(signal)) { 335 ending++; 336 } 337 }; 338 339 // We expect the operator to immediately end and close 340 closingOperator(source)(sink); 341 expect(closing).toBe(1); 342 expect(ending).toBe(1); 343 }); 344 345const passesAsyncSequence = ( 346 operator: types.operatorT<any, any>, 347 result: any = 0 348) => 349 it('passes an async push with an async end (spec)', () => { 350 let hasPushed = false; 351 const signals = []; 352 353 const source: types.sourceT<any> = sink => { 354 sink(deriving.start(tb => { 355 if (tb === deriving.pull && !hasPushed) { 356 hasPushed = true; 357 setTimeout(() => sink(deriving.push(0)), 10); 358 setTimeout(() => sink(deriving.end()), 20); 359 } 360 })); 361 }; 362 363 const sink: types.sinkT<any> = signal => { 364 if (deriving.isStart(signal)) { 365 setTimeout(() => { 366 deriving.unboxStart(signal)(deriving.pull); 367 }, 5); 368 } else { 369 signals.push(signal); 370 } 371 }; 372 373 // We initially expect to see the push signal 374 // Afterwards after all timers all other signals come in 375 operator(source)(sink); 376 expect(signals.length).toBe(0); 377 jest.advanceTimersByTime(5); 378 expect(hasPushed).toBeTruthy(); 379 jest.runAllTimers(); 380 381 expect(signals).toEqual([ 382 deriving.push(result), 383 deriving.end() 384 ]); 385 }); 386 387beforeEach(() => { 388 jest.useFakeTimers(); 389}); 390 391describe('combine', () => { 392 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source); 393 394 passesPassivePull(noop, [0, 0]); 395 passesActivePush(noop, [0, 0]); 396 passesSinkClose(noop); 397 passesSourceEnd(noop, [0, 0]); 398 passesSingleStart(noop); 399 passesStrictEnd(noop); 400 401 it('emits the zipped values of two sources', () => { 402 const { source: sourceA, next: nextA } = sources.makeSubject(); 403 const { source: sourceB, next: nextB } = sources.makeSubject(); 404 const fn = jest.fn(); 405 406 sinks.forEach(fn)(operators.combine(sourceA, sourceB)); 407 408 nextA(1); 409 expect(fn).not.toHaveBeenCalled(); 410 nextB(2); 411 expect(fn).toHaveBeenCalledWith([1, 2]); 412 }); 413}); 414 415describe('buffer', () => { 416 const valueThenNever: types.sourceT<any> = sink => 417 sink(deriving.start(tb => { 418 if (tb === deriving.pull) 419 sink(deriving.push(null)); 420 })); 421 422 const noop = operators.buffer(valueThenNever); 423 424 passesPassivePull(noop, [0]); 425 passesActivePush(noop, [0]); 426 passesSinkClose(noop); 427 passesSourcePushThenEnd(noop, [0]); 428 passesSingleStart(noop); 429 passesStrictEnd(noop); 430 431 it('emits batches of input values when a notifier emits', () => { 432 const { source: notifier$, next: notify } = sources.makeSubject(); 433 const { source: input$, next } = sources.makeSubject(); 434 const fn = jest.fn(); 435 436 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 437 438 next(1); 439 next(2); 440 expect(fn).not.toHaveBeenCalled(); 441 442 notify(null); 443 expect(fn).toHaveBeenCalledWith([1, 2]); 444 445 next(3); 446 notify(null); 447 expect(fn).toHaveBeenCalledWith([3]); 448 }); 449}); 450 451describe('concatMap', () => { 452 const noop = operators.concatMap(x => sources.fromValue(x)); 453 passesPassivePull(noop); 454 passesActivePush(noop); 455 passesSinkClose(noop); 456 passesSourcePushThenEnd(noop); 457 passesSingleStart(noop); 458 passesStrictEnd(noop); 459 passesAsyncSequence(noop); 460 461 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 462 it('emits values from each flattened synchronous source', () => { 463 const { source, next, complete } = sources.makeSubject<number>(); 464 const fn = jest.fn(); 465 466 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 467 468 next(1); 469 next(3); 470 complete(); 471 472 expect(fn).toHaveBeenCalledTimes(6); 473 expect(fn.mock.calls).toEqual([ 474 [deriving.start(expect.any(Function))], 475 [deriving.push(1)], 476 [deriving.push(2)], 477 [deriving.push(3)], 478 [deriving.push(4)], 479 [deriving.end()], 480 ]); 481 }); 482 483 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 484 it('lets inner sources finish when outer source ends', () => { 485 const values = []; 486 const teardown = jest.fn(); 487 const fn = (signal: types.signalT<any>) => { 488 values.push(signal); 489 if (deriving.isStart(signal)) { 490 deriving.unboxStart(signal)(deriving.pull); 491 deriving.unboxStart(signal)(deriving.close); 492 } 493 }; 494 495 operators.concatMap(() => { 496 return sources.make(() => teardown); 497 })(sources.fromValue(null))(fn); 498 499 expect(teardown).toHaveBeenCalled(); 500 expect(values).toEqual([ 501 deriving.start(expect.any(Function)), 502 ]); 503 }); 504 505 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 506 it('emits values from each flattened asynchronous source, one at a time', () => { 507 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 508 const fn = jest.fn(); 509 510 sinks.forEach(fn)( 511 operators.concatMap((x: number) => { 512 return web.delay(5)(sources.fromArray([x, x * 2])); 513 })(source) 514 ); 515 516 jest.advanceTimersByTime(14); 517 expect(fn.mock.calls).toEqual([ 518 [1], 519 [2], 520 ]); 521 522 jest.runAllTimers(); 523 expect(fn.mock.calls).toEqual([ 524 [1], 525 [2], 526 [10], 527 [20], 528 ]); 529 }); 530 531 it('works for fully asynchronous sources', () => { 532 const fn = jest.fn(); 533 534 sinks.forEach(fn)( 535 operators.concatMap(() => { 536 return sources.make(observer => { 537 setTimeout(() => observer.next(1)); 538 return () => {}; 539 }) 540 })(sources.fromValue(null)) 541 ); 542 543 jest.runAllTimers(); 544 expect(fn).toHaveBeenCalledWith(1); 545 }); 546 547 it('emits synchronous values in order', () => { 548 const values = []; 549 550 sinks.forEach(x => values.push(x))( 551 operators.concat([ 552 sources.fromArray([1, 2]), 553 sources.fromArray([3, 4]) 554 ]) 555 ); 556 557 expect(values).toEqual([ 1, 2, 3, 4 ]); 558 }); 559}); 560 561describe('debounce', () => { 562 const noop = web.debounce(() => 0); 563 passesPassivePull(noop); 564 passesActivePush(noop); 565 passesSinkClose(noop); 566 passesSourceEnd(noop); 567 passesSingleStart(noop); 568 passesStrictEnd(noop); 569 passesAsyncSequence(noop); 570 571 it('waits for a specified amount of silence before emitting the last value', () => { 572 const { source, next } = sources.makeSubject<number>(); 573 const fn = jest.fn(); 574 575 sinks.forEach(fn)(web.debounce(() => 100)(source)); 576 577 next(1); 578 jest.advanceTimersByTime(50); 579 expect(fn).not.toHaveBeenCalled(); 580 581 next(2); 582 jest.advanceTimersByTime(99); 583 expect(fn).not.toHaveBeenCalled(); 584 585 jest.advanceTimersByTime(1); 586 expect(fn).toHaveBeenCalledWith(2); 587 }); 588 589 it('emits debounced value with delayed End signal', () => { 590 const { source, next, complete } = sources.makeSubject<number>(); 591 const fn = jest.fn(); 592 593 sinks.forEach(fn)(web.debounce(() => 100)(source)); 594 595 next(1); 596 complete(); 597 jest.advanceTimersByTime(100); 598 expect(fn).toHaveBeenCalled(); 599 }); 600}); 601 602describe('delay', () => { 603 const noop = web.delay(0); 604 passesPassivePull(noop); 605 passesActivePush(noop); 606 passesSinkClose(noop); 607 passesSourceEnd(noop); 608 passesSingleStart(noop); 609 passesAsyncSequence(noop); 610 611 it('delays outputs by a specified delay timeout value', () => { 612 const { source, next } = sources.makeSubject(); 613 const fn = jest.fn(); 614 615 sinks.forEach(fn)(web.delay(100)(source)); 616 617 next(1); 618 expect(fn).not.toHaveBeenCalled(); 619 620 jest.advanceTimersByTime(100); 621 expect(fn).toHaveBeenCalledWith(1); 622 }); 623}); 624 625describe('filter', () => { 626 const noop = operators.filter(() => true); 627 passesPassivePull(noop); 628 passesActivePush(noop); 629 passesSinkClose(noop); 630 passesSourceEnd(noop); 631 passesSingleStart(noop); 632 passesAsyncSequence(noop); 633 634 it('prevents emissions for which a predicate fails', () => { 635 const { source, next } = sources.makeSubject(); 636 const fn = jest.fn(); 637 638 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 639 640 next(false); 641 expect(fn).not.toHaveBeenCalled(); 642 643 next(true); 644 expect(fn).toHaveBeenCalledWith(true); 645 }); 646}); 647 648describe('map', () => { 649 const noop = operators.map(x => x); 650 passesPassivePull(noop); 651 passesActivePush(noop); 652 passesSinkClose(noop); 653 passesSourceEnd(noop); 654 passesSingleStart(noop); 655 passesAsyncSequence(noop); 656 657 it('maps over values given a transform function', () => { 658 const { source, next } = sources.makeSubject<number>(); 659 const fn = jest.fn(); 660 661 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 662 663 next(1); 664 expect(fn).toHaveBeenCalledWith(2); 665 }); 666}); 667 668describe('mergeMap', () => { 669 const noop = operators.mergeMap(x => sources.fromValue(x)); 670 passesPassivePull(noop); 671 passesActivePush(noop); 672 passesSinkClose(noop); 673 passesSourcePushThenEnd(noop); 674 passesSingleStart(noop); 675 passesStrictEnd(noop); 676 passesAsyncSequence(noop); 677 678 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 679 it('emits values from each flattened synchronous source', () => { 680 const { source, next, complete } = sources.makeSubject<number>(); 681 const fn = jest.fn(); 682 683 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 684 685 next(1); 686 next(3); 687 complete(); 688 689 expect(fn.mock.calls).toEqual([ 690 [deriving.start(expect.any(Function))], 691 [deriving.push(1)], 692 [deriving.push(2)], 693 [deriving.push(3)], 694 [deriving.push(4)], 695 [deriving.end()], 696 ]); 697 }); 698 699 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 700 it('lets inner sources finish when outer source ends', () => { 701 const values = []; 702 const teardown = jest.fn(); 703 const fn = (signal: types.signalT<any>) => { 704 values.push(signal); 705 if (deriving.isStart(signal)) { 706 deriving.unboxStart(signal)(deriving.pull); 707 deriving.unboxStart(signal)(deriving.close); 708 } 709 }; 710 711 operators.mergeMap(() => { 712 return sources.make(() => teardown); 713 })(sources.fromValue(null))(fn); 714 715 expect(teardown).toHaveBeenCalled(); 716 expect(values).toEqual([ 717 deriving.start(expect.any(Function)), 718 ]); 719 }); 720 721 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 722 it('emits values from each flattened asynchronous source simultaneously', () => { 723 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 724 const fn = jest.fn(); 725 726 sinks.forEach(fn)( 727 operators.mergeMap((x: number) => { 728 return web.delay(5)(sources.fromArray([x, x * 2])); 729 })(source) 730 ); 731 732 jest.runAllTimers(); 733 expect(fn.mock.calls).toEqual([ 734 [1], 735 [10], 736 [2], 737 [20], 738 ]); 739 }); 740 741 it('emits synchronous values in order', () => { 742 const values = []; 743 744 sinks.forEach(x => values.push(x))( 745 operators.merge([ 746 sources.fromArray([1, 2]), 747 sources.fromArray([3, 4]) 748 ]) 749 ); 750 751 expect(values).toEqual([ 1, 2, 3, 4 ]); 752 }); 753}); 754 755describe('onEnd', () => { 756 const noop = operators.onEnd(() => {}); 757 passesPassivePull(noop); 758 passesActivePush(noop); 759 passesSinkClose(noop); 760 passesSourceEnd(noop); 761 passesStrictEnd(noop); 762 passesSingleStart(noop); 763 passesAsyncSequence(noop); 764 765 it('calls a callback when the source ends', () => { 766 const { source, next, complete } = sources.makeSubject<number>(); 767 const fn = jest.fn(); 768 769 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 770 771 next(null); 772 expect(fn).not.toHaveBeenCalled(); 773 774 complete(); 775 expect(fn).toHaveBeenCalled(); 776 }); 777}); 778 779describe('onPush', () => { 780 const noop = operators.onPush(() => {}); 781 passesPassivePull(noop); 782 passesActivePush(noop); 783 passesSinkClose(noop); 784 passesSourceEnd(noop); 785 passesStrictEnd(noop); 786 passesSingleStart(noop); 787 passesAsyncSequence(noop); 788 789 it('calls a callback when the source emits', () => { 790 const { source, next } = sources.makeSubject<number>(); 791 const fn = jest.fn(); 792 793 sinks.forEach(() => {})(operators.onPush(fn)(source)); 794 795 next(1); 796 expect(fn).toHaveBeenCalledWith(1); 797 next(2); 798 expect(fn).toHaveBeenCalledWith(2); 799 }); 800 801 it('is the same as `tap`', () => { 802 expect(operators.onPush).toBe(operators.tap); 803 }); 804}); 805 806describe('onStart', () => { 807 const noop = operators.onStart(() => {}); 808 passesPassivePull(noop); 809 passesActivePush(noop); 810 passesSinkClose(noop); 811 passesSourceEnd(noop); 812 passesSingleStart(noop); 813 passesAsyncSequence(noop); 814 815 it('is called when the source starts', () => { 816 let sink: types.sinkT<any>; 817 818 const fn = jest.fn(); 819 const source: types.sourceT<any> = _sink => { sink = _sink; }; 820 821 sinks.forEach(() => {})(operators.onStart(fn)(source)); 822 823 expect(fn).not.toHaveBeenCalled(); 824 825 sink(deriving.start(() => {})); 826 expect(fn).toHaveBeenCalled(); 827 }); 828}); 829 830describe('sample', () => { 831 const valueThenNever: types.sourceT<any> = sink => 832 sink(deriving.start(tb => { 833 if (tb === deriving.pull) 834 sink(deriving.push(null)); 835 })); 836 837 const noop = operators.sample(valueThenNever); 838 839 passesPassivePull(noop); 840 passesActivePush(noop); 841 passesSinkClose(noop); 842 passesSourcePushThenEnd(noop); 843 passesSingleStart(noop); 844 passesStrictEnd(noop); 845 846 it('emits the latest value when a notifier source emits', () => { 847 const { source: notifier$, next: notify } = sources.makeSubject(); 848 const { source: input$, next } = sources.makeSubject(); 849 const fn = jest.fn(); 850 851 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 852 853 next(1); 854 next(2); 855 expect(fn).not.toHaveBeenCalled(); 856 857 notify(null); 858 expect(fn).toHaveBeenCalledWith(2); 859 }); 860}); 861 862describe('scan', () => { 863 const noop = operators.scan((_acc, x) => x, null); 864 passesPassivePull(noop); 865 passesActivePush(noop); 866 passesSinkClose(noop); 867 passesSourceEnd(noop); 868 passesSingleStart(noop); 869 passesAsyncSequence(noop); 870 871 it('folds values continuously with a reducer and initial value', () => { 872 const { source: input$, next } = sources.makeSubject<number>(); 873 const fn = jest.fn(); 874 875 const reducer = (acc: number, x: number) => acc + x; 876 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 877 878 next(1); 879 expect(fn).toHaveBeenCalledWith(1); 880 next(2); 881 expect(fn).toHaveBeenCalledWith(3); 882 }); 883}); 884 885describe('share', () => { 886 const noop = operators.share; 887 passesPassivePull(noop); 888 passesActivePush(noop); 889 passesSinkClose(noop); 890 passesSourceEnd(noop); 891 passesSingleStart(noop); 892 passesStrictEnd(noop); 893 passesAsyncSequence(noop); 894 895 it('shares output values between sinks', () => { 896 let push = () => {}; 897 898 const source: types.sourceT<any> = operators.share(sink => { 899 sink(deriving.start(() => {})); 900 push = () => { 901 sink(deriving.push([0])); 902 sink(deriving.end()); 903 }; 904 }); 905 906 const fnA = jest.fn(); 907 const fnB = jest.fn(); 908 909 sinks.forEach(fnA)(source); 910 sinks.forEach(fnB)(source); 911 push(); 912 913 expect(fnA).toHaveBeenCalledWith([0]); 914 expect(fnB).toHaveBeenCalledWith([0]); 915 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 916 }); 917}); 918 919describe('skip', () => { 920 const noop = operators.skip(0); 921 passesPassivePull(noop); 922 passesActivePush(noop); 923 passesSinkClose(noop); 924 passesSourceEnd(noop); 925 passesSingleStart(noop); 926 passesAsyncSequence(noop); 927 928 it('skips a number of values before emitting normally', () => { 929 const { source, next } = sources.makeSubject<number>(); 930 const fn = jest.fn(); 931 932 sinks.forEach(fn)(operators.skip(1)(source)); 933 934 next(1); 935 expect(fn).not.toHaveBeenCalled(); 936 next(2); 937 expect(fn).toHaveBeenCalledWith(2); 938 }); 939}); 940 941describe('skipUntil', () => { 942 const noop = operators.skipUntil(sources.fromValue(null)); 943 passesPassivePull(noop); 944 passesActivePush(noop); 945 passesSinkClose(noop); 946 passesSourceEnd(noop); 947 passesSingleStart(noop); 948 passesAsyncSequence(noop); 949 passesStrictEnd(noop); 950 951 it('skips values until the notifier source emits', () => { 952 const { source: notifier$, next: notify } = sources.makeSubject(); 953 const { source: input$, next } = sources.makeSubject<number>(); 954 const fn = jest.fn(); 955 956 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 957 958 next(1); 959 expect(fn).not.toHaveBeenCalled(); 960 notify(null); 961 next(2); 962 expect(fn).toHaveBeenCalledWith(2); 963 }); 964}); 965 966describe('skipWhile', () => { 967 const noop = operators.skipWhile(() => false); 968 passesPassivePull(noop); 969 passesActivePush(noop); 970 passesSinkClose(noop); 971 passesSourceEnd(noop); 972 passesSingleStart(noop); 973 passesAsyncSequence(noop); 974 975 it('skips values until one fails a predicate', () => { 976 const { source, next } = sources.makeSubject<number>(); 977 const fn = jest.fn(); 978 979 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source)); 980 981 next(1); 982 expect(fn).not.toHaveBeenCalled(); 983 next(2); 984 expect(fn).toHaveBeenCalledWith(2); 985 }); 986}); 987 988describe('switchMap', () => { 989 const noop = operators.switchMap(x => sources.fromValue(x)); 990 passesPassivePull(noop); 991 passesActivePush(noop); 992 passesSinkClose(noop); 993 passesSourcePushThenEnd(noop); 994 passesSingleStart(noop); 995 passesStrictEnd(noop); 996 passesAsyncSequence(noop); 997 998 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 999 it('emits values from each flattened synchronous source', () => { 1000 const { source, next, complete } = sources.makeSubject<number>(); 1001 const fn = jest.fn(); 1002 1003 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 1004 1005 next(1); 1006 next(3); 1007 complete(); 1008 1009 expect(fn).toHaveBeenCalledTimes(6); 1010 expect(fn.mock.calls).toEqual([ 1011 [deriving.start(expect.any(Function))], 1012 [deriving.push(1)], 1013 [deriving.push(2)], 1014 [deriving.push(3)], 1015 [deriving.push(4)], 1016 [deriving.end()], 1017 ]); 1018 }); 1019 1020 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 1021 it('lets inner sources finish when outer source ends', () => { 1022 const values = []; 1023 const teardown = jest.fn(); 1024 const fn = (signal: types.signalT<any>) => { 1025 values.push(signal); 1026 if (deriving.isStart(signal)) { 1027 deriving.unboxStart(signal)(deriving.pull); 1028 deriving.unboxStart(signal)(deriving.close); 1029 } 1030 }; 1031 1032 operators.switchMap(() => { 1033 return sources.make(() => teardown); 1034 })(sources.fromValue(null))(fn); 1035 1036 expect(teardown).toHaveBeenCalled(); 1037 expect(values).toEqual([ 1038 deriving.start(expect.any(Function)), 1039 ]); 1040 }); 1041 1042 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 1043 it('emits values from each flattened asynchronous source, one at a time', () => { 1044 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 1045 const fn = jest.fn(); 1046 1047 sinks.forEach(fn)( 1048 operators.switchMap((x: number) => ( 1049 operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5))) 1050 ))(source) 1051 ); 1052 1053 jest.runAllTimers(); 1054 expect(fn.mock.calls).toEqual([ 1055 [1], 1056 [10], 1057 [20], 1058 ]); 1059 }); 1060}); 1061 1062describe('take', () => { 1063 const noop = operators.take(10); 1064 passesPassivePull(noop); 1065 passesActivePush(noop); 1066 passesSinkClose(noop); 1067 passesSourceEnd(noop); 1068 passesSingleStart(noop); 1069 passesStrictEnd(noop); 1070 passesAsyncSequence(noop); 1071 1072 passesCloseAndEnd(operators.take(0)); 1073 1074 it('emits values until a maximum is reached', () => { 1075 const { source, next } = sources.makeSubject<number>(); 1076 const fn = jest.fn(); 1077 1078 operators.take(1)(source)(fn); 1079 next(1); 1080 1081 expect(fn).toHaveBeenCalledTimes(3); 1082 expect(fn.mock.calls).toEqual([ 1083 [deriving.start(expect.any(Function))], 1084 [deriving.push(1)], 1085 [deriving.end()], 1086 ]); 1087 }); 1088}); 1089 1090describe('takeUntil', () => { 1091 const noop = operators.takeUntil(sources.never); 1092 passesPassivePull(noop); 1093 passesActivePush(noop); 1094 passesSinkClose(noop); 1095 passesSourcePushThenEnd(noop); 1096 passesSingleStart(noop); 1097 passesStrictEnd(noop); 1098 passesAsyncSequence(noop); 1099 1100 const ending = operators.takeUntil(sources.fromValue(null)); 1101 passesCloseAndEnd(ending); 1102 1103 it('emits values until a notifier emits', () => { 1104 const { source: notifier$, next: notify } = sources.makeSubject<number>(); 1105 const { source: input$, next } = sources.makeSubject<number>(); 1106 const fn = jest.fn(); 1107 1108 operators.takeUntil(notifier$)(input$)(fn); 1109 next(1); 1110 1111 expect(fn).toHaveBeenCalledTimes(2); 1112 expect(fn.mock.calls).toEqual([ 1113 [deriving.start(expect.any(Function))], 1114 [deriving.push(1)], 1115 ]); 1116 1117 notify(null); 1118 expect(fn).toHaveBeenCalledTimes(3); 1119 expect(fn.mock.calls[2][0]).toEqual(deriving.end()); 1120 }); 1121}); 1122 1123describe('takeWhile', () => { 1124 const noop = operators.takeWhile(() => true); 1125 passesPassivePull(noop); 1126 passesActivePush(noop); 1127 passesSinkClose(noop); 1128 passesSourceEnd(noop); 1129 passesSingleStart(noop); 1130 passesAsyncSequence(noop); 1131 1132 const ending = operators.takeWhile(() => false); 1133 passesCloseAndEnd(ending); 1134 1135 it('emits values while a predicate passes for all values', () => { 1136 const { source, next } = sources.makeSubject<number>(); 1137 const fn = jest.fn(); 1138 1139 operators.takeWhile(x => x < 2)(source)(fn); 1140 next(1); 1141 next(2); 1142 1143 expect(fn.mock.calls).toEqual([ 1144 [deriving.start(expect.any(Function))], 1145 [deriving.push(1)], 1146 [deriving.end()], 1147 ]); 1148 }); 1149}); 1150 1151describe('takeLast', () => { 1152 passesCloseAndEnd(operators.takeLast(0)); 1153 1154 it('emits the last max values of an ended source', () => { 1155 const { source, next, complete } = sources.makeSubject<number>(); 1156 const values = []; 1157 1158 let talkback; 1159 operators.takeLast(1)(source)(signal => { 1160 values.push(signal); 1161 if (deriving.isStart(signal)) 1162 talkback = deriving.unboxStart(signal); 1163 if (!deriving.isEnd(signal)) 1164 talkback(deriving.pull); 1165 }); 1166 1167 next(1); 1168 next(2); 1169 1170 expect(values.length).toBe(0); 1171 complete(); 1172 1173 expect(values).toEqual([ 1174 deriving.start(expect.any(Function)), 1175 deriving.push(2), 1176 deriving.end(), 1177 ]); 1178 }); 1179}); 1180 1181describe('throttle', () => { 1182 const noop = web.throttle(() => 0); 1183 passesPassivePull(noop); 1184 passesActivePush(noop); 1185 passesSinkClose(noop); 1186 passesSourceEnd(noop); 1187 passesSingleStart(noop); 1188 passesAsyncSequence(noop); 1189 1190 it('should ignore emissions for a period of time after a value', () => { 1191 const { source, next } = sources.makeSubject<number>(); 1192 const fn = jest.fn(); 1193 1194 sinks.forEach(fn)(web.throttle(() => 100)(source)); 1195 1196 next(1); 1197 expect(fn).toHaveBeenCalledWith(1); 1198 jest.advanceTimersByTime(50); 1199 1200 next(2); 1201 expect(fn).toHaveBeenCalledTimes(1); 1202 jest.advanceTimersByTime(50); 1203 1204 next(3); 1205 expect(fn).toHaveBeenCalledWith(3); 1206 }); 1207});