Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import * as deriving from './helpers/wonka_deriving'; 2import * as sources from './wonka_sources.gen'; 3import * as sinks from './wonka_sinks.gen'; 4import * as operators from './wonka_operators.gen'; 5import * as web from './web/wonkaJs.gen'; 6import * as types from './wonka_types.gen'; 7 8/* This tests a noop operator for passive Pull talkback signals. 9 A Pull will be sent from the sink upwards and should pass through 10 the operator until the source receives it, which then pushes a 11 value down. */ 12const passesPassivePull = ( 13 operator: types.operatorT<any, any>, 14 output: any = 0 15) => 16 it('responds to Pull talkback signals (spec)', () => { 17 let talkback = null; 18 let push = 0; 19 const values = []; 20 21 const source: types.sourceT<any> = sink => { 22 sink(deriving.start(tb => { 23 if (!push && tb === deriving.pull) { 24 push++; 25 sink(deriving.push(0)); 26 } 27 })); 28 }; 29 30 const sink: types.sinkT<any> = signal => { 31 expect(deriving.isEnd(signal)).toBeFalsy(); 32 if (deriving.isPush(signal)) { 33 values.push(deriving.unboxPush(signal)); 34 } else if (deriving.isStart(signal)) { 35 talkback = deriving.unboxStart(signal); 36 } 37 }; 38 39 operator(source)(sink); 40 // The Start signal should always come in immediately 41 expect(talkback).not.toBe(null); 42 // No Push signals should be issued initially 43 expect(values).toEqual([]); 44 45 // When pulling a value we expect an immediate response 46 talkback(deriving.pull); 47 jest.runAllTimers(); 48 expect(values).toEqual([output]); 49 }); 50 51/* This tests a noop operator for regular, active Push signals. 52 A Push will be sent downwards from the source, through the 53 operator to the sink. Pull events should be let through from 54 the sink after every Push event. */ 55const passesActivePush = ( 56 operator: types.operatorT<any, any>, 57 result: any = 0 58) => 59 it('responds to eager Push signals (spec)', () => { 60 const values = []; 61 let talkback = null; 62 let push = null; 63 let pulls = 0; 64 65 const source: types.sourceT<any> = sink => { 66 push = (value: any) => sink(deriving.push(value)); 67 sink(deriving.start(tb => { 68 if (tb === deriving.pull) 69 pulls++; 70 })); 71 }; 72 73 const sink: types.sinkT<any> = signal => { 74 expect(deriving.isEnd(signal)).toBeFalsy(); 75 if (deriving.isStart(signal)) { 76 talkback = deriving.unboxStart(signal); 77 } else if (deriving.isPush(signal)) { 78 values.push(deriving.unboxPush(signal)); 79 talkback(deriving.pull); 80 } 81 }; 82 83 operator(source)(sink); 84 // No Pull signals should be issued initially 85 expect(pulls).toBe(0); 86 87 // When pushing a value we expect an immediate response 88 push(0); 89 jest.runAllTimers(); 90 expect(values).toEqual([result]); 91 // Subsequently the Pull signal should have travelled upwards 92 expect(pulls).toBe(1); 93 }); 94 95/* This tests a noop operator for Close talkback signals from the sink. 96 A Close signal will be sent, which should be forwarded to the source, 97 which then ends the communication without sending an End signal. */ 98const passesSinkClose = (operator: types.operatorT<any, any>) => 99 it('responds to Close signals from sink (spec)', () => { 100 let talkback = null; 101 let closing = 0; 102 let pulls = 0; 103 104 const source: types.sourceT<any> = sink => { 105 sink(deriving.start(tb => { 106 if (tb === deriving.pull) { 107 pulls++; 108 if (!closing) sink(deriving.push(0)); 109 } if (tb === deriving.close) { 110 closing++; 111 } 112 })); 113 }; 114 115 const sink: types.sinkT<any> = signal => { 116 expect(deriving.isEnd(signal)).toBeFalsy(); 117 if (deriving.isStart(signal)) { 118 talkback = deriving.unboxStart(signal); 119 } else if (deriving.isPush(signal)) { 120 talkback(deriving.close); 121 } 122 }; 123 124 operator(source)(sink); 125 126 // When pushing a value we expect an immediate close signal 127 talkback(deriving.pull); 128 jest.runAllTimers(); 129 expect(closing).toBe(1); 130 expect(pulls).toBe(1); 131 }); 132 133/* This tests a noop operator for End signals from the source. 134 A Push and End signal will be sent after the first Pull talkback 135 signal from the sink, which shouldn't lead to any extra Close or Pull 136 talkback signals. */ 137const passesSourceEnd = ( 138 operator: types.operatorT<any, any>, 139 result: any = 0 140) => 141 it('passes on End signals from source (spec)', () => { 142 const signals = []; 143 let talkback = null; 144 let pulls = 0; 145 let ending = 0; 146 147 const source: types.sourceT<any> = sink => { 148 sink(deriving.start(tb => { 149 expect(tb).not.toBe(deriving.close); 150 if (tb === deriving.pull) pulls++; 151 if (pulls === 1) { 152 sink(deriving.push(0)); 153 sink(deriving.end()); 154 } 155 })); 156 }; 157 158 const sink: types.sinkT<any> = signal => { 159 if (deriving.isStart(signal)) { 160 talkback = deriving.unboxStart(signal); 161 } else { 162 signals.push(signal); 163 if (deriving.isEnd(signal)) ending++; 164 } 165 }; 166 167 operator(source)(sink); 168 169 // When pushing a value we expect an immediate Push then End signal 170 talkback(deriving.pull); 171 jest.runAllTimers(); 172 expect(pulls).toBe(1); 173 expect(ending).toBe(1); 174 expect(signals).toEqual([deriving.push(result), deriving.end()]); 175 }); 176 177/* This tests a noop operator for Start signals from the source. 178 When the operator's sink is started by the source it'll receive 179 a Start event. As a response it should never send more than one 180 Start signals to the sink. */ 181const passesSingleStart = (operator: types.operatorT<any, any>) => 182 it('sends a single Start event to the incoming sink (spec)', () => { 183 let start = 0; 184 185 const source: types.sourceT<any> = sink => { 186 sink(deriving.start(() => {})); 187 }; 188 189 const sink: types.sinkT<any> = signal => { 190 if (deriving.isStart(signal)) start++; 191 }; 192 193 // When starting the operator we expect a single start event on the sink 194 operator(source)(sink); 195 expect(start).toBe(1); 196 }); 197 198/* This tests a noop operator for silence after End signals from the source. 199 When the operator receives the End signal it shouldn't forward any other 200 signals to the sink anymore. 201 This isn't a strict requirement, but some operators should ensure that 202 all sources are well behaved. This is particularly true for operators 203 that either Close sources themselves or may operate on multiple sources. */ 204const passesStrictEnd = (operator: types.operatorT<any, any>) => { 205 it('stops all signals after End has been received (spec: strict end)', () => { 206 let pulls = 0; 207 const signals = []; 208 209 const source: types.sourceT<any> = sink => { 210 sink(deriving.start(tb => { 211 if (tb === deriving.pull) { 212 pulls++; 213 sink(deriving.end()); 214 sink(deriving.push(123)); 215 } 216 })); 217 }; 218 219 const sink: types.sinkT<any> = signal => { 220 if (deriving.isStart(signal)) { 221 deriving.unboxStart(signal)(deriving.pull); 222 } else { 223 signals.push(signal); 224 } 225 }; 226 227 operator(source)(sink); 228 229 // The Push signal should've been dropped 230 jest.runAllTimers(); 231 expect(signals).toEqual([deriving.end()]); 232 expect(pulls).toBe(1); 233 }); 234 235 it('stops all signals after Close has been received (spec: strict close)', () => { 236 const signals = []; 237 238 const source: types.sourceT<any> = sink => { 239 sink(deriving.start(tb => { 240 if (tb === deriving.close) { 241 sink(deriving.push(123)); 242 } 243 })); 244 }; 245 246 const sink: types.sinkT<any> = signal => { 247 if (deriving.isStart(signal)) { 248 deriving.unboxStart(signal)(deriving.close); 249 } else { 250 signals.push(signal); 251 } 252 }; 253 254 operator(source)(sink); 255 256 // The Push signal should've been dropped 257 jest.runAllTimers(); 258 expect(signals).toEqual([]); 259 }); 260}; 261 262/* This tests an immediately closing operator for End signals to 263 the sink and Close signals to the source. 264 When an operator closes immediately we expect to see a Close 265 signal at the source and an End signal to the sink, since the 266 closing operator is expected to end the entire chain. */ 267const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) => 268 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 269 let closing = 0; 270 let ending = 0; 271 272 const source: types.sourceT<any> = sink => { 273 sink(deriving.start(tb => { 274 // For some operator tests we do need to send a single value 275 if (tb === deriving.pull) 276 sink(deriving.push(null)); 277 if (tb === deriving.close) 278 closing++; 279 })); 280 }; 281 282 const sink: types.sinkT<any> = signal => { 283 if (deriving.isStart(signal)) { 284 deriving.unboxStart(signal)(deriving.pull); 285 } if (deriving.isEnd(signal)) { 286 ending++; 287 } 288 }; 289 290 // We expect the operator to immediately end and close 291 closingOperator(source)(sink); 292 expect(closing).toBe(1); 293 expect(ending).toBe(1); 294 }); 295 296const passesAsyncSequence = ( 297 operator: types.operatorT<any, any>, 298 result: any = 0 299) => 300 it('passes an async push with an async end (spec)', () => { 301 let hasPushed = false; 302 const signals = []; 303 304 const source: types.sourceT<any> = sink => { 305 sink(deriving.start(tb => { 306 if (tb === deriving.pull && !hasPushed) { 307 hasPushed = true; 308 setTimeout(() => sink(deriving.push(0)), 10); 309 setTimeout(() => sink(deriving.end()), 20); 310 } 311 })); 312 }; 313 314 const sink: types.sinkT<any> = signal => { 315 if (deriving.isStart(signal)) { 316 setTimeout(() => { 317 deriving.unboxStart(signal)(deriving.pull); 318 }, 5); 319 } else { 320 signals.push(signal); 321 } 322 }; 323 324 // We initially expect to see the push signal 325 // Afterwards after all timers all other signals come in 326 operator(source)(sink); 327 expect(signals.length).toBe(0); 328 jest.advanceTimersByTime(5); 329 expect(hasPushed).toBeTruthy(); 330 jest.runAllTimers(); 331 332 expect(signals).toEqual([ 333 deriving.push(result), 334 deriving.end() 335 ]); 336 }); 337 338beforeEach(() => { 339 jest.useFakeTimers(); 340}); 341 342describe('combine', () => { 343 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source); 344 345 passesPassivePull(noop, [0, 0]); 346 passesActivePush(noop, [0, 0]); 347 passesSinkClose(noop); 348 passesSourceEnd(noop, [0, 0]); 349 passesSingleStart(noop); 350 passesStrictEnd(noop); 351 352 it('emits the zipped values of two sources', () => { 353 const { source: sourceA, next: nextA } = sources.makeSubject(); 354 const { source: sourceB, next: nextB } = sources.makeSubject(); 355 const fn = jest.fn(); 356 357 sinks.forEach(fn)(operators.combine(sourceA, sourceB)); 358 359 nextA(1); 360 expect(fn).not.toHaveBeenCalled(); 361 nextB(2); 362 expect(fn).toHaveBeenCalledWith([1, 2]); 363 }); 364}); 365 366describe('buffer', () => { 367 const valueThenNever: types.sourceT<any> = sink => 368 sink(deriving.start(tb => { 369 if (tb === deriving.pull) 370 sink(deriving.push(null)); 371 })); 372 373 const noop = operators.buffer(valueThenNever); 374 375 passesPassivePull(noop, [0]); 376 passesActivePush(noop, [0]); 377 passesSinkClose(noop); 378 passesSourceEnd(noop, [0]); 379 passesSingleStart(noop); 380 passesStrictEnd(noop); 381 382 it('emits batches of input values when a notifier emits', () => { 383 const { source: notifier$, next: notify } = sources.makeSubject(); 384 const { source: input$, next } = sources.makeSubject(); 385 const fn = jest.fn(); 386 387 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 388 389 next(1); 390 next(2); 391 expect(fn).not.toHaveBeenCalled(); 392 393 notify(null); 394 expect(fn).toHaveBeenCalledWith([1, 2]); 395 396 next(3); 397 notify(null); 398 expect(fn).toHaveBeenCalledWith([3]); 399 }); 400}); 401 402describe('concatMap', () => { 403 const noop = operators.concatMap(x => sources.fromValue(x)); 404 passesPassivePull(noop); 405 passesActivePush(noop); 406 passesSinkClose(noop); 407 passesSourceEnd(noop); 408 passesSingleStart(noop); 409 passesStrictEnd(noop); 410 passesAsyncSequence(noop); 411 412 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 413 it('emits values from each flattened synchronous source', () => { 414 const { source, next, complete } = sources.makeSubject<number>(); 415 const fn = jest.fn(); 416 417 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 418 419 next(1); 420 next(3); 421 complete(); 422 423 expect(fn).toHaveBeenCalledTimes(6); 424 expect(fn.mock.calls).toEqual([ 425 [deriving.start(expect.any(Function))], 426 [deriving.push(1)], 427 [deriving.push(2)], 428 [deriving.push(3)], 429 [deriving.push(4)], 430 [deriving.end()], 431 ]); 432 }); 433 434 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 435 it('emits values from each flattened asynchronous source, one at a time', () => { 436 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 437 const fn = jest.fn(); 438 439 sinks.forEach(fn)( 440 operators.concatMap((x: number) => { 441 return web.delay(5)(sources.fromArray([x, x * 2])); 442 })(source) 443 ); 444 445 jest.advanceTimersByTime(14); 446 expect(fn.mock.calls).toEqual([ 447 [1], 448 [2], 449 ]); 450 451 jest.runAllTimers(); 452 expect(fn.mock.calls).toEqual([ 453 [1], 454 [2], 455 [10], 456 [20], 457 ]); 458 }); 459 460 it('emits synchronous values in order', () => { 461 const values = []; 462 463 sinks.forEach(x => values.push(x))( 464 operators.concat([ 465 sources.fromArray([1, 2]), 466 sources.fromArray([3, 4]) 467 ]) 468 ); 469 470 expect(values).toEqual([ 1, 2, 3, 4 ]); 471 }); 472}); 473 474describe('debounce', () => { 475 const noop = web.debounce(() => 0); 476 passesPassivePull(noop); 477 passesActivePush(noop); 478 passesSinkClose(noop); 479 passesSourceEnd(noop); 480 passesSingleStart(noop); 481 passesStrictEnd(noop); 482 passesAsyncSequence(noop); 483 484 it('waits for a specified amount of silence before emitting the last value', () => { 485 const { source, next } = sources.makeSubject<number>(); 486 const fn = jest.fn(); 487 488 sinks.forEach(fn)(web.debounce(() => 100)(source)); 489 490 next(1); 491 jest.advanceTimersByTime(50); 492 expect(fn).not.toHaveBeenCalled(); 493 494 next(2); 495 jest.advanceTimersByTime(99); 496 expect(fn).not.toHaveBeenCalled(); 497 498 jest.advanceTimersByTime(1); 499 expect(fn).toHaveBeenCalledWith(2); 500 }); 501 502 it('emits debounced value with delayed End signal', () => { 503 const { source, next, complete } = sources.makeSubject<number>(); 504 const fn = jest.fn(); 505 506 sinks.forEach(fn)(web.debounce(() => 100)(source)); 507 508 next(1); 509 complete(); 510 jest.advanceTimersByTime(100); 511 expect(fn).toHaveBeenCalled(); 512 }); 513}); 514 515describe('delay', () => { 516 const noop = web.delay(0); 517 passesPassivePull(noop); 518 passesActivePush(noop); 519 passesSinkClose(noop); 520 passesSourceEnd(noop); 521 passesSingleStart(noop); 522 passesAsyncSequence(noop); 523 524 it('delays outputs by a specified delay timeout value', () => { 525 const { source, next } = sources.makeSubject(); 526 const fn = jest.fn(); 527 528 sinks.forEach(fn)(web.delay(100)(source)); 529 530 next(1); 531 expect(fn).not.toHaveBeenCalled(); 532 533 jest.advanceTimersByTime(100); 534 expect(fn).toHaveBeenCalledWith(1); 535 }); 536}); 537 538describe('filter', () => { 539 const noop = operators.filter(() => true); 540 passesPassivePull(noop); 541 passesActivePush(noop); 542 passesSinkClose(noop); 543 passesSourceEnd(noop); 544 passesSingleStart(noop); 545 passesAsyncSequence(noop); 546 547 it('prevents emissions for which a predicate fails', () => { 548 const { source, next } = sources.makeSubject(); 549 const fn = jest.fn(); 550 551 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 552 553 next(false); 554 expect(fn).not.toHaveBeenCalled(); 555 556 next(true); 557 expect(fn).toHaveBeenCalledWith(true); 558 }); 559}); 560 561describe('map', () => { 562 const noop = operators.map(x => x); 563 passesPassivePull(noop); 564 passesActivePush(noop); 565 passesSinkClose(noop); 566 passesSourceEnd(noop); 567 passesSingleStart(noop); 568 passesAsyncSequence(noop); 569 570 it('maps over values given a transform function', () => { 571 const { source, next } = sources.makeSubject<number>(); 572 const fn = jest.fn(); 573 574 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 575 576 next(1); 577 expect(fn).toHaveBeenCalledWith(2); 578 }); 579}); 580 581describe('mergeMap', () => { 582 const noop = operators.mergeMap(x => sources.fromValue(x)); 583 passesPassivePull(noop); 584 passesActivePush(noop); 585 passesSinkClose(noop); 586 passesSourceEnd(noop); 587 passesSingleStart(noop); 588 passesStrictEnd(noop); 589 passesAsyncSequence(noop); 590 591 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 592 it('emits values from each flattened synchronous source', () => { 593 const { source, next, complete } = sources.makeSubject<number>(); 594 const fn = jest.fn(); 595 596 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 597 598 next(1); 599 next(3); 600 complete(); 601 602 expect(fn).toHaveBeenCalledTimes(6); 603 expect(fn.mock.calls).toEqual([ 604 [deriving.start(expect.any(Function))], 605 [deriving.push(1)], 606 [deriving.push(2)], 607 [deriving.push(3)], 608 [deriving.push(4)], 609 [deriving.end()], 610 ]); 611 }); 612 613 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 614 it('emits values from each flattened asynchronous source simultaneously', () => { 615 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 616 const fn = jest.fn(); 617 618 sinks.forEach(fn)( 619 operators.mergeMap((x: number) => { 620 return web.delay(5)(sources.fromArray([x, x * 2])); 621 })(source) 622 ); 623 624 jest.runAllTimers(); 625 expect(fn.mock.calls).toEqual([ 626 [1], 627 [2], 628 [10], 629 [20], 630 ]); 631 }); 632 633 it('emits synchronous values in order', () => { 634 const values = []; 635 636 sinks.forEach(x => values.push(x))( 637 operators.merge([ 638 sources.fromArray([1, 2]), 639 sources.fromArray([3, 4]) 640 ]) 641 ); 642 643 expect(values).toEqual([ 1, 2, 3, 4 ]); 644 }); 645}); 646 647describe('onEnd', () => { 648 const noop = operators.onEnd(() => {}); 649 passesPassivePull(noop); 650 passesActivePush(noop); 651 passesSinkClose(noop); 652 passesSourceEnd(noop); 653 passesSingleStart(noop); 654 passesAsyncSequence(noop); 655 656 it('calls a callback when the source ends', () => { 657 const { source, next, complete } = sources.makeSubject<number>(); 658 const fn = jest.fn(); 659 660 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 661 662 next(null); 663 expect(fn).not.toHaveBeenCalled(); 664 665 complete(); 666 expect(fn).toHaveBeenCalled(); 667 }); 668}); 669 670describe('onPush', () => { 671 const noop = operators.onPush(() => {}); 672 passesPassivePull(noop); 673 passesActivePush(noop); 674 passesSinkClose(noop); 675 passesSourceEnd(noop); 676 passesSingleStart(noop); 677 passesAsyncSequence(noop); 678 679 it('calls a callback when the source emits', () => { 680 const { source, next } = sources.makeSubject<number>(); 681 const fn = jest.fn(); 682 683 sinks.forEach(() => {})(operators.onPush(fn)(source)); 684 685 next(1); 686 expect(fn).toHaveBeenCalledWith(1); 687 next(2); 688 expect(fn).toHaveBeenCalledWith(2); 689 }); 690 691 it('is the same as `tap`', () => { 692 expect(operators.onPush).toBe(operators.tap); 693 }); 694}); 695 696describe('onStart', () => { 697 const noop = operators.onStart(() => {}); 698 passesPassivePull(noop); 699 passesActivePush(noop); 700 passesSinkClose(noop); 701 passesSourceEnd(noop); 702 passesSingleStart(noop); 703 passesAsyncSequence(noop); 704 705 it('is called when the source starts', () => { 706 let sink: types.sinkT<any>; 707 708 const fn = jest.fn(); 709 const source: types.sourceT<any> = _sink => { sink = _sink; }; 710 711 sinks.forEach(() => {})(operators.onStart(fn)(source)); 712 713 expect(fn).not.toHaveBeenCalled(); 714 715 sink(deriving.start(() => {})); 716 expect(fn).toHaveBeenCalled(); 717 }); 718}); 719 720describe('sample', () => { 721 const valueThenNever: types.sourceT<any> = sink => 722 sink(deriving.start(tb => { 723 if (tb === deriving.pull) 724 sink(deriving.push(null)); 725 })); 726 727 const noop = operators.sample(valueThenNever); 728 729 passesPassivePull(noop); 730 passesActivePush(noop); 731 passesSinkClose(noop); 732 passesSourceEnd(noop); 733 passesSingleStart(noop); 734 passesStrictEnd(noop); 735 736 it('emits the latest value when a notifier source emits', () => { 737 const { source: notifier$, next: notify } = sources.makeSubject(); 738 const { source: input$, next } = sources.makeSubject(); 739 const fn = jest.fn(); 740 741 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 742 743 next(1); 744 next(2); 745 expect(fn).not.toHaveBeenCalled(); 746 747 notify(null); 748 expect(fn).toHaveBeenCalledWith(2); 749 }); 750}); 751 752describe('scan', () => { 753 const noop = operators.scan((_acc, x) => x, null); 754 passesPassivePull(noop); 755 passesActivePush(noop); 756 passesSinkClose(noop); 757 passesSourceEnd(noop); 758 passesSingleStart(noop); 759 passesAsyncSequence(noop); 760 761 it('folds values continuously with a reducer and initial value', () => { 762 const { source: input$, next } = sources.makeSubject<number>(); 763 const fn = jest.fn(); 764 765 const reducer = (acc: number, x: number) => acc + x; 766 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 767 768 next(1); 769 expect(fn).toHaveBeenCalledWith(1); 770 next(2); 771 expect(fn).toHaveBeenCalledWith(3); 772 }); 773}); 774 775describe('share', () => { 776 const noop = operators.share; 777 passesPassivePull(noop); 778 passesActivePush(noop); 779 passesSinkClose(noop); 780 passesSourceEnd(noop); 781 passesSingleStart(noop); 782 passesStrictEnd(noop); 783 passesAsyncSequence(noop); 784 785 it('shares output values between sinks', () => { 786 let push = () => {}; 787 788 const source: types.sourceT<any> = operators.share(sink => { 789 sink(deriving.start(() => {})); 790 push = () => { 791 sink(deriving.push([0])); 792 sink(deriving.end()); 793 }; 794 }); 795 796 const fnA = jest.fn(); 797 const fnB = jest.fn(); 798 799 sinks.forEach(fnA)(source); 800 sinks.forEach(fnB)(source); 801 push(); 802 803 expect(fnA).toHaveBeenCalledWith([0]); 804 expect(fnB).toHaveBeenCalledWith([0]); 805 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 806 }); 807}); 808 809describe('skip', () => { 810 const noop = operators.skip(0); 811 passesPassivePull(noop); 812 passesActivePush(noop); 813 passesSinkClose(noop); 814 passesSourceEnd(noop); 815 passesSingleStart(noop); 816 passesAsyncSequence(noop); 817 818 it('skips a number of values before emitting normally', () => { 819 const { source, next } = sources.makeSubject<number>(); 820 const fn = jest.fn(); 821 822 sinks.forEach(fn)(operators.skip(1)(source)); 823 824 next(1); 825 expect(fn).not.toHaveBeenCalled(); 826 next(2); 827 expect(fn).toHaveBeenCalledWith(2); 828 }); 829}); 830 831describe('skipUntil', () => { 832 const noop = operators.skipUntil(sources.fromValue(null)); 833 passesPassivePull(noop); 834 passesActivePush(noop); 835 passesSinkClose(noop); 836 passesSourceEnd(noop); 837 passesSingleStart(noop); 838 passesAsyncSequence(noop); 839 passesStrictEnd(noop); 840 841 it('skips values until the notifier source emits', () => { 842 const { source: notifier$, next: notify } = sources.makeSubject(); 843 const { source: input$, next } = sources.makeSubject<number>(); 844 const fn = jest.fn(); 845 846 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 847 848 next(1); 849 expect(fn).not.toHaveBeenCalled(); 850 notify(null); 851 next(2); 852 expect(fn).toHaveBeenCalledWith(2); 853 }); 854}); 855 856describe('skipWhile', () => { 857 const noop = operators.skipWhile(() => false); 858 passesPassivePull(noop); 859 passesActivePush(noop); 860 passesSinkClose(noop); 861 passesSourceEnd(noop); 862 passesSingleStart(noop); 863 passesAsyncSequence(noop); 864 865 it('skips values until one fails a predicate', () => { 866 const { source, next } = sources.makeSubject<number>(); 867 const fn = jest.fn(); 868 869 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source)); 870 871 next(1); 872 expect(fn).not.toHaveBeenCalled(); 873 next(2); 874 expect(fn).toHaveBeenCalledWith(2); 875 }); 876}); 877 878describe('switchMap', () => { 879 const noop = operators.switchMap(x => sources.fromValue(x)); 880 passesPassivePull(noop); 881 passesActivePush(noop); 882 passesSinkClose(noop); 883 passesSourceEnd(noop); 884 passesSingleStart(noop); 885 passesStrictEnd(noop); 886 passesAsyncSequence(noop); 887 888 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 889 it('emits values from each flattened synchronous source', () => { 890 const { source, next, complete } = sources.makeSubject<number>(); 891 const fn = jest.fn(); 892 893 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 894 895 next(1); 896 next(3); 897 complete(); 898 899 expect(fn).toHaveBeenCalledTimes(6); 900 expect(fn.mock.calls).toEqual([ 901 [deriving.start(expect.any(Function))], 902 [deriving.push(1)], 903 [deriving.push(2)], 904 [deriving.push(3)], 905 [deriving.push(4)], 906 [deriving.end()], 907 ]); 908 }); 909 910 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 911 it('emits values from each flattened asynchronous source, one at a time', () => { 912 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 913 const fn = jest.fn(); 914 915 sinks.forEach(fn)( 916 operators.switchMap((x: number) => ( 917 operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5))) 918 ))(source) 919 ); 920 921 jest.runAllTimers(); 922 expect(fn.mock.calls).toEqual([ 923 [1], 924 [10], 925 [20], 926 ]); 927 }); 928}); 929 930describe('take', () => { 931 const noop = operators.take(10); 932 passesPassivePull(noop); 933 passesActivePush(noop); 934 passesSinkClose(noop); 935 passesSourceEnd(noop); 936 passesSingleStart(noop); 937 passesStrictEnd(noop); 938 passesAsyncSequence(noop); 939 940 passesCloseAndEnd(operators.take(0)); 941 942 it('emits values until a maximum is reached', () => { 943 const { source, next } = sources.makeSubject<number>(); 944 const fn = jest.fn(); 945 946 operators.take(1)(source)(fn); 947 next(1); 948 949 expect(fn).toHaveBeenCalledTimes(3); 950 expect(fn.mock.calls).toEqual([ 951 [deriving.start(expect.any(Function))], 952 [deriving.push(1)], 953 [deriving.end()], 954 ]); 955 }); 956}); 957 958describe('takeUntil', () => { 959 const noop = operators.takeUntil(sources.never); 960 passesPassivePull(noop); 961 passesActivePush(noop); 962 passesSinkClose(noop); 963 passesSourceEnd(noop); 964 passesSingleStart(noop); 965 passesStrictEnd(noop); 966 passesAsyncSequence(noop); 967 968 const ending = operators.takeUntil(sources.fromValue(null)); 969 passesCloseAndEnd(ending); 970 971 it('emits values until a notifier emits', () => { 972 const { source: notifier$, next: notify } = sources.makeSubject<number>(); 973 const { source: input$, next } = sources.makeSubject<number>(); 974 const fn = jest.fn(); 975 976 operators.takeUntil(notifier$)(input$)(fn); 977 next(1); 978 979 expect(fn).toHaveBeenCalledTimes(2); 980 expect(fn.mock.calls).toEqual([ 981 [deriving.start(expect.any(Function))], 982 [deriving.push(1)], 983 ]); 984 985 notify(null); 986 expect(fn).toHaveBeenCalledTimes(3); 987 expect(fn.mock.calls[2][0]).toEqual(deriving.end()); 988 }); 989}); 990 991describe('takeWhile', () => { 992 const noop = operators.takeWhile(() => true); 993 passesPassivePull(noop); 994 passesActivePush(noop); 995 passesSinkClose(noop); 996 passesSourceEnd(noop); 997 passesSingleStart(noop); 998 passesAsyncSequence(noop); 999 1000 const ending = operators.takeWhile(() => false); 1001 passesCloseAndEnd(ending); 1002 1003 it('emits values while a predicate passes for all values', () => { 1004 const { source, next } = sources.makeSubject<number>(); 1005 const fn = jest.fn(); 1006 1007 operators.takeWhile(x => x < 2)(source)(fn); 1008 next(1); 1009 next(2); 1010 1011 expect(fn.mock.calls).toEqual([ 1012 [deriving.start(expect.any(Function))], 1013 [deriving.push(1)], 1014 [deriving.end()], 1015 ]); 1016 }); 1017}); 1018 1019describe('takeLast', () => { 1020 passesCloseAndEnd(operators.takeLast(0)); 1021 1022 it('emits the last max values of an ended source', () => { 1023 const { source, next, complete } = sources.makeSubject<number>(); 1024 const values = []; 1025 1026 let talkback; 1027 operators.takeLast(1)(source)(signal => { 1028 values.push(signal); 1029 if (deriving.isStart(signal)) 1030 talkback = deriving.unboxStart(signal); 1031 if (!deriving.isEnd(signal)) 1032 talkback(deriving.pull); 1033 }); 1034 1035 next(1); 1036 next(2); 1037 1038 expect(values.length).toBe(0); 1039 complete(); 1040 1041 expect(values).toEqual([ 1042 deriving.start(expect.any(Function)), 1043 deriving.push(2), 1044 deriving.end(), 1045 ]); 1046 }); 1047}); 1048 1049describe('throttle', () => { 1050 const noop = web.throttle(() => 0); 1051 passesPassivePull(noop); 1052 passesActivePush(noop); 1053 passesSinkClose(noop); 1054 passesSourceEnd(noop); 1055 passesSingleStart(noop); 1056 passesAsyncSequence(noop); 1057 1058 it('should ignore emissions for a period of time after a value', () => { 1059 const { source, next } = sources.makeSubject<number>(); 1060 const fn = jest.fn(); 1061 1062 sinks.forEach(fn)(web.throttle(() => 100)(source)); 1063 1064 next(1); 1065 expect(fn).toHaveBeenCalledWith(1); 1066 jest.advanceTimersByTime(50); 1067 1068 next(2); 1069 expect(fn).toHaveBeenCalledTimes(1); 1070 jest.advanceTimersByTime(50); 1071 1072 next(3); 1073 expect(fn).toHaveBeenCalledWith(3); 1074 }); 1075});