Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import * as deriving from './helpers/wonka_deriving'; 2import * as sources from './wonka_sources.gen'; 3import * as sinks from './wonka_sinks.gen'; 4import * as operators from './wonka_operators.gen'; 5import * as web from './web/wonkaJs.gen'; 6import * as types from './wonka_types.gen'; 7 8/* This tests a noop operator for passive Pull talkback signals. 9 A Pull will be sent from the sink upwards and should pass through 10 the operator until the source receives it, which then pushes a 11 value down. */ 12const passesPassivePull = ( 13 operator: types.operatorT<any, any>, 14 output: any = 0 15) => 16 it('responds to Pull talkback signals (spec)', () => { 17 let talkback = null; 18 const values = []; 19 20 const source: types.sourceT<any> = sink => { 21 sink(deriving.start(tb => { 22 if (tb === deriving.pull) 23 sink(deriving.push(0)); 24 })); 25 }; 26 27 const sink: types.sinkT<any> = signal => { 28 expect(deriving.isEnd(signal)).toBeFalsy(); 29 if (deriving.isPush(signal)) { 30 values.push(deriving.unboxPush(signal)); 31 } else if (deriving.isStart(signal)) { 32 talkback = deriving.unboxStart(signal); 33 } 34 }; 35 36 operator(source)(sink); 37 // The Start signal should always come in immediately 38 expect(talkback).not.toBe(null); 39 // No Push signals should be issued initially 40 expect(values).toEqual([]); 41 42 // When pulling a value we expect an immediate response 43 talkback(deriving.pull); 44 jest.runAllTimers(); 45 expect(values).toEqual([output]); 46 }); 47 48/* This tests a noop operator for regular, active Push signals. 49 A Push will be sent downwards from the source, through the 50 operator to the sink. Pull events should be let through from 51 the sink after every Push event. */ 52const passesActivePush = ( 53 operator: types.operatorT<any, any>, 54 result: any = 0 55) => 56 it('responds to eager Push signals (spec)', () => { 57 const values = []; 58 let talkback = null; 59 let push = null; 60 let pulls = 0; 61 62 const source: types.sourceT<any> = sink => { 63 push = (value: any) => sink(deriving.push(value)); 64 sink(deriving.start(tb => { 65 if (tb === deriving.pull) 66 pulls++; 67 })); 68 }; 69 70 const sink: types.sinkT<any> = signal => { 71 expect(deriving.isEnd(signal)).toBeFalsy(); 72 if (deriving.isStart(signal)) { 73 talkback = deriving.unboxStart(signal); 74 } else if (deriving.isPush(signal)) { 75 values.push(deriving.unboxPush(signal)); 76 talkback(deriving.pull); 77 } 78 }; 79 80 operator(source)(sink); 81 // No Pull signals should be issued initially 82 expect(pulls).toBe(0); 83 84 // When pushing a value we expect an immediate response 85 push(0); 86 jest.runAllTimers(); 87 expect(values).toEqual([result]); 88 // Subsequently the Pull signal should have travelled upwards 89 expect(pulls).toBe(1); 90 }); 91 92/* This tests a noop operator for Close talkback signals from the sink. 93 A Close signal will be sent, which should be forwarded to the source, 94 which then ends the communication without sending an End signal. */ 95const passesSinkClose = (operator: types.operatorT<any, any>) => 96 it('responds to Close signals from sink (spec)', () => { 97 let talkback = null; 98 let closing = 0; 99 let pulls = 0; 100 101 const source: types.sourceT<any> = sink => { 102 sink(deriving.start(tb => { 103 if (tb === deriving.pull) { 104 pulls++; 105 if (!closing) sink(deriving.push(0)); 106 } if (tb === deriving.close) { 107 closing++; 108 } 109 })); 110 }; 111 112 const sink: types.sinkT<any> = signal => { 113 expect(deriving.isEnd(signal)).toBeFalsy(); 114 if (deriving.isStart(signal)) { 115 talkback = deriving.unboxStart(signal); 116 } else if (deriving.isPush(signal)) { 117 talkback(deriving.close); 118 } 119 }; 120 121 operator(source)(sink); 122 123 // When pushing a value we expect an immediate close signal 124 talkback(deriving.pull); 125 jest.runAllTimers(); 126 expect(closing).toBe(1); 127 expect(pulls).toBe(1); 128 }); 129 130/* This tests a noop operator for End signals from the source. 131 A Push and End signal will be sent after the first Pull talkback 132 signal from the sink, which shouldn't lead to any extra Close or Pull 133 talkback signals. */ 134const passesSourceEnd = ( 135 operator: types.operatorT<any, any>, 136 result: any = 0 137) => 138 it('passes on End signals from source (spec)', () => { 139 const signals = []; 140 let talkback = null; 141 let pulls = 0; 142 let ending = 0; 143 144 const source: types.sourceT<any> = sink => { 145 sink(deriving.start(tb => { 146 expect(tb).not.toBe(deriving.close); 147 if (tb === deriving.pull) pulls++; 148 if (pulls === 1) { 149 sink(deriving.push(0)); 150 sink(deriving.end()); 151 } 152 })); 153 }; 154 155 const sink: types.sinkT<any> = signal => { 156 if (deriving.isStart(signal)) { 157 talkback = deriving.unboxStart(signal); 158 } else { 159 signals.push(signal); 160 if (deriving.isEnd(signal)) ending++; 161 } 162 }; 163 164 operator(source)(sink); 165 166 // When pushing a value we expect an immediate Push then End signal 167 talkback(deriving.pull); 168 jest.runAllTimers(); 169 expect(pulls).toBe(1); 170 expect(ending).toBe(1); 171 expect(signals).toEqual([deriving.push(result), deriving.end()]); 172 }); 173 174/* This tests a noop operator for Start signals from the source. 175 When the operator's sink is started by the source it'll receive 176 a Start event. As a response it should never send more than one 177 Start signals to the sink. */ 178const passesSingleStart = (operator: types.operatorT<any, any>) => 179 it('sends a single Start event to the incoming sink (spec)', () => { 180 let start = 0; 181 182 const source: types.sourceT<any> = sink => { 183 sink(deriving.start(() => {})); 184 }; 185 186 const sink: types.sinkT<any> = signal => { 187 if (deriving.isStart(signal)) start++; 188 }; 189 190 // When starting the operator we expect a single start event on the sink 191 operator(source)(sink); 192 expect(start).toBe(1); 193 }); 194 195/* This tests a noop operator for silence after End signals from the source. 196 When the operator receives the End signal it shouldn't forward any other 197 signals to the sink anymore. 198 This isn't a strict requirement, but some operators should ensure that 199 all sources are well behaved. This is particularly true for operators 200 that either Close sources themselves or may operate on multiple sources. */ 201const passesStrictEnd = (operator: types.operatorT<any, any>) => 202 it('stops all signals after End has been received (spec: strict end)', () => { 203 let pulls = 0; 204 const signals = []; 205 206 const source: types.sourceT<any> = sink => { 207 sink(deriving.start(tb => { 208 if (tb === deriving.pull) { 209 pulls++; 210 sink(deriving.end()); 211 sink(deriving.push(0)); 212 } 213 })); 214 }; 215 216 const sink: types.sinkT<any> = signal => { 217 if (deriving.isStart(signal)) { 218 deriving.unboxStart(signal)(deriving.pull); 219 } else { 220 signals.push(signal); 221 } 222 }; 223 224 operator(source)(sink); 225 226 // The Push signal should've been dropped 227 jest.runAllTimers(); 228 expect(signals).toEqual([deriving.end()]); 229 expect(pulls).toBe(1); 230 }); 231 232/* This tests an immediately closing operator for End signals to 233 the sink and Close signals to the source. 234 When an operator closes immediately we expect to see a Close 235 signal at the source and an End signal to the sink, since the 236 closing operator is expected to end the entire chain. */ 237const passesCloseAndEnd = (closingOperator: types.operatorT<any, any>) => 238 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 239 let closing = 0; 240 let ending = 0; 241 242 const source: types.sourceT<any> = sink => { 243 sink(deriving.start(tb => { 244 // For some operator tests we do need to send a single value 245 if (tb === deriving.pull) 246 sink(deriving.push(null)); 247 if (tb === deriving.close) 248 closing++; 249 })); 250 }; 251 252 const sink: types.sinkT<any> = signal => { 253 if (deriving.isStart(signal)) { 254 deriving.unboxStart(signal)(deriving.pull); 255 } if (deriving.isEnd(signal)) { 256 ending++; 257 } 258 }; 259 260 // We expect the operator to immediately end and close 261 closingOperator(source)(sink); 262 expect(closing).toBe(1); 263 expect(ending).toBe(1); 264 }); 265 266const passesAsyncSequence = ( 267 operator: types.operatorT<any, any>, 268 result: any = 0 269) => 270 it('passes an async push with an async end (spec)', () => { 271 let hasPushed = false; 272 const signals = []; 273 274 const source: types.sourceT<any> = sink => { 275 sink(deriving.start(tb => { 276 if (tb === deriving.pull && !hasPushed) { 277 hasPushed = true; 278 setTimeout(() => sink(deriving.push(0)), 10); 279 setTimeout(() => sink(deriving.end()), 20); 280 } 281 })); 282 }; 283 284 const sink: types.sinkT<any> = signal => { 285 if (deriving.isStart(signal)) { 286 setTimeout(() => { 287 deriving.unboxStart(signal)(deriving.pull); 288 }, 5); 289 } else { 290 signals.push(signal); 291 } 292 }; 293 294 // We initially expect to see the push signal 295 // Afterwards after all timers all other signals come in 296 operator(source)(sink); 297 expect(signals.length).toBe(0); 298 jest.advanceTimersByTime(5); 299 expect(hasPushed).toBeTruthy(); 300 jest.runAllTimers(); 301 302 expect(signals).toEqual([ 303 deriving.push(result), 304 deriving.end() 305 ]); 306 }); 307 308beforeEach(() => { 309 jest.useFakeTimers(); 310}); 311 312describe('combine', () => { 313 const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source); 314 315 passesPassivePull(noop, [0, 0]); 316 // TODO: passesActivePush(noop, [0, 0]); 317 passesSinkClose(noop); 318 passesSourceEnd(noop, [0, 0]); 319 passesSingleStart(noop); 320 passesStrictEnd(noop); 321 322 it('emits the zipped values of two sources', () => { 323 const { source: sourceA, next: nextA } = sources.makeSubject(); 324 const { source: sourceB, next: nextB } = sources.makeSubject(); 325 const fn = jest.fn(); 326 327 sinks.forEach(fn)(operators.combine(sourceA, sourceB)); 328 329 nextA(1); 330 expect(fn).not.toHaveBeenCalled(); 331 nextB(2); 332 expect(fn).toHaveBeenCalledWith([1, 2]); 333 }); 334}); 335 336describe('buffer', () => { 337 const noop = operators.buffer( 338 operators.merge([ 339 sources.fromValue(null), 340 sources.never 341 ]) 342 ); 343 344 // TODO: passesPassivePull(noop, [0]); 345 // TODO: passesActivePush(noop); 346 // TODO: passesSinkClose(noop); 347 // TODO: passesSourceEnd(noop); 348 passesSingleStart(noop); 349 // TODO: passesStrictEnd(noop); 350 351 it('emits batches of input values when a notifier emits', () => { 352 const { source: notifier$, next: notify } = sources.makeSubject(); 353 const { source: input$, next } = sources.makeSubject(); 354 const fn = jest.fn(); 355 356 sinks.forEach(fn)(operators.buffer(notifier$)(input$)); 357 358 next(1); 359 next(2); 360 expect(fn).not.toHaveBeenCalled(); 361 362 notify(null); 363 expect(fn).toHaveBeenCalledWith([1, 2]); 364 }); 365}); 366 367describe('concatMap', () => { 368 const noop = operators.concatMap(x => sources.fromValue(x)); 369 // TODO: passesPassivePull(noop); 370 // TODO: passesActivePush(noop); 371 // TODO: passesSinkClose(noop); 372 // TODO: passesSourceEnd(noop); 373 passesSingleStart(noop); 374 passesStrictEnd(noop); 375 passesAsyncSequence(noop); 376 377 // This synchronous test for concatMap will behave the same as mergeMap & switchMap 378 it('emits values from each flattened synchronous source', () => { 379 const { source, next, complete } = sources.makeSubject<number>(); 380 const fn = jest.fn(); 381 382 operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 383 384 next(1); 385 next(3); 386 complete(); 387 388 expect(fn).toHaveBeenCalledTimes(6); 389 expect(fn.mock.calls).toEqual([ 390 [deriving.start(expect.any(Function))], 391 [deriving.push(1)], 392 [deriving.push(2)], 393 [deriving.push(3)], 394 [deriving.push(4)], 395 [deriving.end()], 396 ]); 397 }); 398 399 // This asynchronous test for concatMap will behave differently than mergeMap & switchMap 400 it('emits values from each flattened asynchronous source, one at a time', () => { 401 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 402 const fn = jest.fn(); 403 404 sinks.forEach(fn)( 405 operators.concatMap((x: number) => { 406 return web.delay(5)(sources.fromArray([x, x * 2])); 407 })(source) 408 ); 409 410 jest.advanceTimersByTime(14); 411 expect(fn.mock.calls).toEqual([ 412 [1], 413 [2], 414 ]); 415 416 jest.runAllTimers(); 417 expect(fn.mock.calls).toEqual([ 418 [1], 419 [2], 420 [10], 421 [20], 422 ]); 423 }); 424}); 425 426describe('debounce', () => { 427 const noop = web.debounce(() => 0); 428 passesPassivePull(noop); 429 passesActivePush(noop); 430 passesSinkClose(noop); 431 passesSourceEnd(noop); 432 passesSingleStart(noop); 433 // TODO: passesStrictEnd(noop); 434 passesAsyncSequence(noop); 435 436 it('waits for a specified amount of silence before emitting the last value', () => { 437 const { source, next } = sources.makeSubject<number>(); 438 const fn = jest.fn(); 439 440 sinks.forEach(fn)(web.debounce(() => 100)(source)); 441 442 next(1); 443 jest.advanceTimersByTime(50); 444 expect(fn).not.toHaveBeenCalled(); 445 446 next(2); 447 jest.advanceTimersByTime(99); 448 expect(fn).not.toHaveBeenCalled(); 449 450 jest.advanceTimersByTime(1); 451 expect(fn).toHaveBeenCalledWith(2); 452 }); 453}); 454 455describe('delay', () => { 456 const noop = web.delay(0); 457 passesPassivePull(noop); 458 passesActivePush(noop); 459 // TODO: passesSinkClose(noop); 460 passesSourceEnd(noop); 461 passesSingleStart(noop); 462 passesAsyncSequence(noop); 463 464 it('delays outputs by a specified delay timeout value', () => { 465 const { source, next } = sources.makeSubject(); 466 const fn = jest.fn(); 467 468 sinks.forEach(fn)(web.delay(100)(source)); 469 470 next(1); 471 expect(fn).not.toHaveBeenCalled(); 472 473 jest.advanceTimersByTime(100); 474 expect(fn).toHaveBeenCalledWith(1); 475 }); 476}); 477 478describe('filter', () => { 479 const noop = operators.filter(() => true); 480 passesPassivePull(noop); 481 passesActivePush(noop); 482 passesSinkClose(noop); 483 passesSourceEnd(noop); 484 passesSingleStart(noop); 485 passesAsyncSequence(noop); 486 487 it('prevents emissions for which a predicate fails', () => { 488 const { source, next } = sources.makeSubject(); 489 const fn = jest.fn(); 490 491 sinks.forEach(fn)(operators.filter(x => !!x)(source)); 492 493 next(false); 494 expect(fn).not.toHaveBeenCalled(); 495 496 next(true); 497 expect(fn).toHaveBeenCalledWith(true); 498 }); 499}); 500 501describe('map', () => { 502 const noop = operators.map(x => x); 503 passesPassivePull(noop); 504 passesActivePush(noop); 505 passesSinkClose(noop); 506 passesSourceEnd(noop); 507 passesSingleStart(noop); 508 // TODO: passesStrictEnd(noop); 509 passesAsyncSequence(noop); 510 511 it('maps over values given a transform function', () => { 512 const { source, next } = sources.makeSubject<number>(); 513 const fn = jest.fn(); 514 515 sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); 516 517 next(1); 518 expect(fn).toHaveBeenCalledWith(2); 519 }); 520}); 521 522describe('mergeMap', () => { 523 const noop = operators.mergeMap(x => sources.fromValue(x)); 524 // TODO: passesPassivePull(noop); 525 // TODO: passesActivePush(noop); 526 // TODO: passesSinkClose(noop); 527 // TODO: passesSourceEnd(noop); 528 passesSingleStart(noop); 529 passesStrictEnd(noop); 530 passesAsyncSequence(noop); 531 532 // This synchronous test for mergeMap will behave the same as concatMap & switchMap 533 it('emits values from each flattened synchronous source', () => { 534 const { source, next, complete } = sources.makeSubject<number>(); 535 const fn = jest.fn(); 536 537 operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 538 539 next(1); 540 next(3); 541 complete(); 542 543 expect(fn).toHaveBeenCalledTimes(6); 544 expect(fn.mock.calls).toEqual([ 545 [deriving.start(expect.any(Function))], 546 [deriving.push(1)], 547 [deriving.push(2)], 548 [deriving.push(3)], 549 [deriving.push(4)], 550 [deriving.end()], 551 ]); 552 }); 553 554 // This asynchronous test for mergeMap will behave differently than concatMap & switchMap 555 it('emits values from each flattened asynchronous source simultaneously', () => { 556 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 557 const fn = jest.fn(); 558 559 sinks.forEach(fn)( 560 operators.mergeMap((x: number) => { 561 return web.delay(5)(sources.fromArray([x, x * 2])); 562 })(source) 563 ); 564 565 jest.advanceTimersByTime(14); 566 expect(fn.mock.calls).toEqual([ 567 [1], 568 [10], 569 [2], 570 [20], 571 ]); 572 }); 573}); 574 575describe('onEnd', () => { 576 const noop = operators.onEnd(() => {}); 577 passesPassivePull(noop); 578 passesActivePush(noop); 579 passesSinkClose(noop); 580 passesSourceEnd(noop); 581 passesSingleStart(noop); 582 passesAsyncSequence(noop); 583 584 it('calls a callback when the source ends', () => { 585 const { source, next, complete } = sources.makeSubject<number>(); 586 const fn = jest.fn(); 587 588 sinks.forEach(() => {})(operators.onEnd(fn)(source)); 589 590 next(null); 591 expect(fn).not.toHaveBeenCalled(); 592 593 complete(); 594 expect(fn).toHaveBeenCalled(); 595 }); 596}); 597 598describe('onPush', () => { 599 const noop = operators.onPush(() => {}); 600 passesPassivePull(noop); 601 passesActivePush(noop); 602 passesSinkClose(noop); 603 passesSourceEnd(noop); 604 passesSingleStart(noop); 605 passesAsyncSequence(noop); 606 607 it('calls a callback when the source emits', () => { 608 const { source, next } = sources.makeSubject<number>(); 609 const fn = jest.fn(); 610 611 sinks.forEach(() => {})(operators.onPush(fn)(source)); 612 613 next(1); 614 expect(fn).toHaveBeenCalledWith(1); 615 next(2); 616 expect(fn).toHaveBeenCalledWith(2); 617 }); 618 619 it('is the same as `tap`', () => { 620 expect(operators.onPush).toBe(operators.tap); 621 }); 622}); 623 624describe('onStart', () => { 625 const noop = operators.onStart(() => {}); 626 passesPassivePull(noop); 627 passesActivePush(noop); 628 passesSinkClose(noop); 629 passesSourceEnd(noop); 630 passesSingleStart(noop); 631 passesAsyncSequence(noop); 632 633 it('is called when the source starts', () => { 634 let sink: types.sinkT<any>; 635 636 const fn = jest.fn(); 637 const source: types.sourceT<any> = _sink => { sink = _sink; }; 638 639 sinks.forEach(() => {})(operators.onStart(fn)(source)); 640 641 expect(fn).not.toHaveBeenCalled(); 642 643 sink(deriving.start(() => {})); 644 expect(fn).toHaveBeenCalled(); 645 }); 646}); 647 648describe('sample', () => { 649 const noop = operators.sample(sources.fromValue(null)); 650 // TODO: passesPassivePull(noop); 651 // TODO: passesActivePush(noop); 652 // TODO: passesSinkClose(noop); 653 // TODO: passesSourceEnd(noop); 654 passesSingleStart(noop); 655 // TODO: passesStrictEnd(noop); 656 657 it('emits the latest value when a notifier source emits', () => { 658 const { source: notifier$, next: notify } = sources.makeSubject(); 659 const { source: input$, next } = sources.makeSubject(); 660 const fn = jest.fn(); 661 662 sinks.forEach(fn)(operators.sample(notifier$)(input$)); 663 664 next(1); 665 next(2); 666 expect(fn).not.toHaveBeenCalled(); 667 668 notify(null); 669 expect(fn).toHaveBeenCalledWith(2); 670 }); 671}); 672 673describe('scan', () => { 674 const noop = operators.scan((_acc, x) => x, null); 675 passesPassivePull(noop); 676 passesActivePush(noop); 677 passesSinkClose(noop); 678 passesSourceEnd(noop); 679 passesSingleStart(noop); 680 passesAsyncSequence(noop); 681 682 it('folds values continuously with a reducer and initial value', () => { 683 const { source: input$, next } = sources.makeSubject<number>(); 684 const fn = jest.fn(); 685 686 const reducer = (acc: number, x: number) => acc + x; 687 sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); 688 689 next(1); 690 expect(fn).toHaveBeenCalledWith(1); 691 next(2); 692 expect(fn).toHaveBeenCalledWith(3); 693 }); 694}); 695 696describe('share', () => { 697 const noop = operators.share; 698 passesPassivePull(noop); 699 passesActivePush(noop); 700 passesSinkClose(noop); 701 passesSourceEnd(noop); 702 passesSingleStart(noop); 703 passesStrictEnd(noop); 704 passesAsyncSequence(noop); 705 706 it('shares output values between sinks', () => { 707 let push = () => {}; 708 709 const source: types.sourceT<any> = operators.share(sink => { 710 sink(deriving.start(() => {})); 711 push = () => { 712 sink(deriving.push([0])); 713 sink(deriving.end()); 714 }; 715 }); 716 717 const fnA = jest.fn(); 718 const fnB = jest.fn(); 719 720 sinks.forEach(fnA)(source); 721 sinks.forEach(fnB)(source); 722 push(); 723 724 expect(fnA).toHaveBeenCalledWith([0]); 725 expect(fnB).toHaveBeenCalledWith([0]); 726 expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); 727 }); 728}); 729 730describe('skip', () => { 731 const noop = operators.skip(0); 732 passesPassivePull(noop); 733 passesActivePush(noop); 734 passesSinkClose(noop); 735 passesSourceEnd(noop); 736 passesSingleStart(noop); 737 passesAsyncSequence(noop); 738 739 it('skips a number of values before emitting normally', () => { 740 const { source, next } = sources.makeSubject<number>(); 741 const fn = jest.fn(); 742 743 sinks.forEach(fn)(operators.skip(1)(source)); 744 745 next(1); 746 expect(fn).not.toHaveBeenCalled(); 747 next(2); 748 expect(fn).toHaveBeenCalledWith(2); 749 }); 750}); 751 752describe('skipUntil', () => { 753 const noop = operators.skipUntil(sources.fromValue(null)); 754 // TODO: passesPassivePull(noop); 755 // TODO: passesActivePush(noop); 756 // TODO: passesSinkClose(noop); 757 passesSourceEnd(noop); 758 passesSingleStart(noop); 759 passesAsyncSequence(noop); 760 761 it('skips values until the notifier source emits', () => { 762 const { source: notifier$, next: notify } = sources.makeSubject(); 763 const { source: input$, next } = sources.makeSubject<number>(); 764 const fn = jest.fn(); 765 766 sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); 767 768 next(1); 769 expect(fn).not.toHaveBeenCalled(); 770 notify(null); 771 next(2); 772 expect(fn).toHaveBeenCalledWith(2); 773 }); 774}); 775 776describe('skipWhile', () => { 777 const noop = operators.skipWhile(() => false); 778 passesPassivePull(noop); 779 passesActivePush(noop); 780 passesSinkClose(noop); 781 passesSourceEnd(noop); 782 passesSingleStart(noop); 783 passesAsyncSequence(noop); 784 785 it('skips values until one fails a predicate', () => { 786 const { source, next } = sources.makeSubject<number>(); 787 const fn = jest.fn(); 788 789 sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source)); 790 791 next(1); 792 expect(fn).not.toHaveBeenCalled(); 793 next(2); 794 expect(fn).toHaveBeenCalledWith(2); 795 }); 796}); 797 798describe('switchMap', () => { 799 const noop = operators.switchMap(x => sources.fromValue(x)); 800 // TODO: passesPassivePull(noop); 801 // TODO: passesActivePush(noop); 802 // TODO: passesSinkClose(noop); 803 // TODO: passesSourceEnd(noop); 804 passesSingleStart(noop); 805 passesStrictEnd(noop); 806 passesAsyncSequence(noop); 807 808 // This synchronous test for switchMap will behave the same as concatMap & mergeMap 809 it('emits values from each flattened synchronous source', () => { 810 const { source, next, complete } = sources.makeSubject<number>(); 811 const fn = jest.fn(); 812 813 operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); 814 815 next(1); 816 next(3); 817 complete(); 818 819 expect(fn).toHaveBeenCalledTimes(6); 820 expect(fn.mock.calls).toEqual([ 821 [deriving.start(expect.any(Function))], 822 [deriving.push(1)], 823 [deriving.push(2)], 824 [deriving.push(3)], 825 [deriving.push(4)], 826 [deriving.end()], 827 ]); 828 }); 829 830 // This asynchronous test for switchMap will behave differently than concatMap & mergeMap 831 it('emits values from each flattened asynchronous source, one at a time', () => { 832 const source = web.delay<number>(4)(sources.fromArray([1, 10])); 833 const fn = jest.fn(); 834 835 sinks.forEach(fn)( 836 operators.switchMap((x: number) => { 837 return web.delay(5)(sources.fromArray([x, x * 2])); 838 })(source) 839 ); 840 841 jest.runAllTimers(); 842 expect(fn.mock.calls).toEqual([ 843 [1], 844 [10], 845 [20], 846 ]); 847 }); 848}); 849 850describe('take', () => { 851 const noop = operators.take(10); 852 passesPassivePull(noop); 853 passesActivePush(noop); 854 // TODO: passesSinkClose(noop); 855 passesSourceEnd(noop); 856 passesSingleStart(noop); 857 passesStrictEnd(noop); 858 passesAsyncSequence(noop); 859 860 // TODO: take(0) seems to be broken 861 const ending = operators.take(1); 862 passesCloseAndEnd(ending); 863 864 it('emits values until a maximum is reached', () => { 865 const { source, next } = sources.makeSubject<number>(); 866 const fn = jest.fn(); 867 868 operators.take(1)(source)(fn); 869 next(1); 870 871 expect(fn).toHaveBeenCalledTimes(3); 872 expect(fn.mock.calls).toEqual([ 873 [deriving.start(expect.any(Function))], 874 [deriving.push(1)], 875 [deriving.end()], 876 ]); 877 }); 878}); 879 880describe('takeUntil', () => { 881 const noop = operators.takeUntil(sources.never); 882 passesPassivePull(noop); 883 passesActivePush(noop); 884 passesSinkClose(noop); 885 passesSourceEnd(noop); 886 passesSingleStart(noop); 887 passesStrictEnd(noop); 888 passesAsyncSequence(noop); 889 890 const ending = operators.takeUntil(sources.fromValue(null)); 891 passesCloseAndEnd(ending); 892 893 it('emits values until a notifier emits', () => { 894 const { source: notifier$, next: notify } = sources.makeSubject<number>(); 895 const { source: input$, next } = sources.makeSubject<number>(); 896 const fn = jest.fn(); 897 898 operators.takeUntil(notifier$)(input$)(fn); 899 next(1); 900 901 expect(fn).toHaveBeenCalledTimes(2); 902 expect(fn.mock.calls).toEqual([ 903 [deriving.start(expect.any(Function))], 904 [deriving.push(1)], 905 ]); 906 907 notify(null); 908 expect(fn).toHaveBeenCalledTimes(3); 909 expect(fn.mock.calls[2][0]).toEqual(deriving.end()); 910 }); 911}); 912 913describe('takeWhile', () => { 914 const noop = operators.takeWhile(() => true); 915 passesPassivePull(noop); 916 passesActivePush(noop); 917 passesSinkClose(noop); 918 passesSourceEnd(noop); 919 // TODO: passesSingleStart(noop); 920 passesStrictEnd(noop); 921 passesAsyncSequence(noop); 922 923 const ending = operators.takeWhile(() => false); 924 passesCloseAndEnd(ending); 925 926 it('emits values while a predicate passes for all values', () => { 927 const { source, next } = sources.makeSubject<number>(); 928 const fn = jest.fn(); 929 930 operators.takeWhile(x => x < 2)(source)(fn); 931 next(1); 932 next(2); 933 934 expect(fn).toHaveBeenCalledTimes(4); 935 expect(fn.mock.calls).toEqual([ 936 [deriving.start(expect.any(Function))], 937 [deriving.start(expect.any(Function))], 938 [deriving.push(1)], 939 [deriving.end()], 940 ]); 941 }); 942}); 943 944describe('throttle', () => { 945 const noop = web.throttle(() => 0); 946 passesPassivePull(noop); 947 passesActivePush(noop); 948 passesSinkClose(noop); 949 passesSourceEnd(noop); 950 passesSingleStart(noop); 951 passesAsyncSequence(noop); 952 953 it('should ignore emissions for a period of time after a value', () => { 954 const { source, next } = sources.makeSubject<number>(); 955 const fn = jest.fn(); 956 957 sinks.forEach(fn)(web.throttle(() => 100)(source)); 958 959 next(1); 960 expect(fn).toHaveBeenCalledWith(1); 961 jest.advanceTimersByTime(50); 962 963 next(2); 964 expect(fn).toHaveBeenCalledTimes(1); 965 jest.advanceTimersByTime(50); 966 967 next(3); 968 expect(fn).toHaveBeenCalledWith(3); 969 }); 970});