Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.2.0 42 kB view raw
1import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3import { fromArray } from './sources'; 4 5const identity = <T>(x: T): T => x; 6 7/** Buffers values and emits the array of bufferd values each time a `notifier` Source emits. 8 * 9 * @param notifier - A {@link Source} that releases the current buffer. 10 * @returns An {@link Operator}. 11 * 12 * @remarks 13 * `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source 14 * emits, it will emit an array of all buffered values. 15 * 16 * This can be used to group values over time. A buffer will only be emitted when it contains any 17 * values. 18 * 19 * @example 20 * ```ts 21 * pipe( 22 * interval(50), 23 * buffer(interval(100)), 24 * subscribe(x => { 25 * console.log(text); // logs: [0], [1, 2], [3, 4]... 26 * }) 27 * ); 28 * ``` 29 */ 30export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> { 31 return source => sink => { 32 let buffer: T[] = []; 33 let sourceTalkback = talkbackPlaceholder; 34 let notifierTalkback = talkbackPlaceholder; 35 let pulled = false; 36 let ended = false; 37 source(signal => { 38 if (ended) { 39 /*noop*/ 40 } else if (signal === SignalKind.End) { 41 ended = true; 42 notifierTalkback(TalkbackKind.Close); 43 if (buffer.length) sink(push(buffer)); 44 sink(SignalKind.End); 45 } else if (signal.tag === SignalKind.Start) { 46 sourceTalkback = signal[0]; 47 notifier(signal => { 48 if (ended) { 49 /*noop*/ 50 } else if (signal === SignalKind.End) { 51 ended = true; 52 sourceTalkback(TalkbackKind.Close); 53 if (buffer.length) sink(push(buffer)); 54 sink(SignalKind.End); 55 } else if (signal.tag === SignalKind.Start) { 56 notifierTalkback = signal[0]; 57 } else if (buffer.length) { 58 const signal = push(buffer); 59 buffer = []; 60 sink(signal); 61 } 62 }); 63 } else { 64 buffer.push(signal[0]); 65 if (!pulled) { 66 pulled = true; 67 sourceTalkback(TalkbackKind.Pull); 68 notifierTalkback(TalkbackKind.Pull); 69 } else { 70 pulled = false; 71 } 72 } 73 }); 74 sink( 75 start(signal => { 76 if (signal === TalkbackKind.Close && !ended) { 77 ended = true; 78 sourceTalkback(TalkbackKind.Close); 79 notifierTalkback(TalkbackKind.Close); 80 } else if (!ended && !pulled) { 81 pulled = true; 82 sourceTalkback(TalkbackKind.Pull); 83 notifierTalkback(TalkbackKind.Pull); 84 } 85 }) 86 ); 87 }; 88} 89 90/** Emits in order from the Sources returned by a mapping function per value of the Source. 91 * 92 * @param map - A function returning a {@link Source} per value. 93 * @returns An {@link Operator}. 94 * 95 * @remarks 96 * `concatMap` accepts a mapping function which must return a {@link Source} per value. 97 * The output {@link Source} will emit values from each Source the function returned, in order, 98 * queuing sources that aren't yet active. 99 * 100 * This can be used to issue multiple values per emission of an input {@link Source}, while keeping 101 * the order of their outputs consistent. 102 * 103 * @example 104 * ```ts 105 * pipe( 106 * fromArray([1, 2]), 107 * concatMap(x => fromArray([x, x * 2])), 108 * subscribe(x => { 109 * console.log(text); // logs: 1, 2, 2, 4 110 * }) 111 * ); 112 * ``` 113 */ 114export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 115 return source => sink => { 116 const inputQueue: In[] = []; 117 let outerTalkback = talkbackPlaceholder; 118 let innerTalkback = talkbackPlaceholder; 119 let outerPulled = false; 120 let innerPulled = false; 121 let innerActive = false; 122 let ended = false; 123 function applyInnerSource(innerSource: Source<Out>): void { 124 innerActive = true; 125 innerSource(signal => { 126 if (signal === SignalKind.End) { 127 if (innerActive) { 128 innerActive = false; 129 if (inputQueue.length) { 130 applyInnerSource(map(inputQueue.shift()!)); 131 } else if (ended) { 132 sink(SignalKind.End); 133 } else if (!outerPulled) { 134 outerPulled = true; 135 outerTalkback(TalkbackKind.Pull); 136 } 137 } 138 } else if (signal.tag === SignalKind.Start) { 139 innerPulled = false; 140 (innerTalkback = signal[0])(TalkbackKind.Pull); 141 } else if (innerActive) { 142 sink(signal); 143 if (innerPulled) { 144 innerPulled = false; 145 } else { 146 innerTalkback(TalkbackKind.Pull); 147 } 148 } 149 }); 150 } 151 source(signal => { 152 if (ended) { 153 /*noop*/ 154 } else if (signal === SignalKind.End) { 155 ended = true; 156 if (!innerActive && !inputQueue.length) sink(SignalKind.End); 157 } else if (signal.tag === SignalKind.Start) { 158 outerTalkback = signal[0]; 159 } else { 160 outerPulled = false; 161 if (innerActive) { 162 inputQueue.push(signal[0]); 163 } else { 164 applyInnerSource(map(signal[0])); 165 } 166 } 167 }); 168 sink( 169 start(signal => { 170 if (signal === TalkbackKind.Close) { 171 if (!ended) { 172 ended = true; 173 outerTalkback(TalkbackKind.Close); 174 } 175 if (innerActive) { 176 innerActive = false; 177 innerTalkback(TalkbackKind.Close); 178 } 179 } else { 180 if (!ended && !outerPulled) { 181 outerPulled = true; 182 outerTalkback(TalkbackKind.Pull); 183 } 184 if (innerActive && !innerPulled) { 185 innerPulled = true; 186 innerTalkback(TalkbackKind.Pull); 187 } 188 } 189 }) 190 ); 191 }; 192} 193 194/** Flattens a Source emitting Sources into a single Source emitting the inner values in order. 195 * 196 * @see {@link concatMap} which this helper uses and instead accept a mapping function. 197 * @param source - An {@link Source} emitting {@link Source | Sources}. 198 * @returns A {@link Source} emitting values from the inner Sources. 199 * 200 * @remarks 201 * `concatAll` accepts a {@link Source} emitting {@link Source | Sources}. 202 * The output {@link Source} will emit values from each Source, in order, queuing sources that 203 * aren't yet active. 204 * 205 * @example 206 * ```ts 207 * pipe( 208 * fromArray([ 209 * fromArray([1, 2]), 210 * fromArray([3, 4]), 211 * ]), 212 * concatAll, 213 * subscribe(x => { 214 * console.log(text); // logs: 1, 2, 3, 4 215 * }) 216 * ); 217 * ``` 218 */ 219export function concatAll<T>(source: Source<Source<T>>): Source<T> { 220 return concatMap<Source<T>, T>(identity)(source); 221} 222 223/** Emits values from the passed sources in order. 224 * 225 * @param sources - An array of {@link Source | Sources}. 226 * @returns A {@link Source} emitting values from the input Sources. 227 * 228 * @remarks 229 * `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting 230 * with the first one and continuing to the next only when the prior source ended. 231 * 232 * This can be used to issue combine sources while keeping the order of their values intact. 233 * 234 * @example 235 * ```ts 236 * pipe( 237 * concat([ 238 * fromArray([1, 2]), 239 * fromArray([3, 4]), 240 * ]), 241 * subscribe(x => { 242 * console.log(text); // logs: 1, 2, 3, 4 243 * }) 244 * ); 245 * ``` 246 */ 247export function concat<T>(sources: Source<T>[]): Source<T> { 248 return concatAll(fromArray(sources)); 249} 250 251/** Filters out emitted values for which the passed predicate function returns `false`. 252 * 253 * @param predicate - A function returning a boolean per value. 254 * @returns An {@link Operator}. 255 * 256 * @remarks 257 * `filter` will omit values from the {@link Source} for which the passed `predicate` function 258 * returns `false`. 259 * 260 * @example 261 * ```ts 262 * pipe( 263 * fromArray([1, 2, 3]), 264 * filter(x => x % 2 === 0), 265 * subscribe(x => { 266 * console.log(text); // logs: 2 267 * }) 268 * ); 269 * ``` 270 */ 271export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> { 272 return source => sink => { 273 let talkback = talkbackPlaceholder; 274 source(signal => { 275 if (signal === SignalKind.End) { 276 sink(SignalKind.End); 277 } else if (signal.tag === SignalKind.Start) { 278 talkback = signal[0]; 279 sink(signal); 280 } else if (!predicate(signal[0])) { 281 talkback(TalkbackKind.Pull); 282 } else { 283 sink(signal); 284 } 285 }); 286 }; 287} 288 289/** Maps emitted values using the passed mapping function. 290 * 291 * @param map - A function returning transforming the {@link Source | Source's} values. 292 * @returns An {@link Operator}. 293 * 294 * @remarks 295 * `map` accepts a transform function and calls it on each emitted value. It then emits 296 * the values returned by the transform function instead. 297 * 298 * @example 299 * ```ts 300 * pipe( 301 * fromArray([1, 2, 3]), 302 * map(x => x * 2), 303 * subscribe(x => { 304 * console.log(text); // logs: 2, 4, 6 305 * }) 306 * ); 307 * ``` 308 */ 309export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> { 310 return source => sink => 311 source(signal => { 312 if (signal === SignalKind.End || signal.tag === SignalKind.Start) { 313 sink(signal); 314 } else { 315 sink(push(map(signal[0]))); 316 } 317 }); 318} 319 320/** Emits from the Sources returned by a mapping function per value of the Source. 321 * 322 * @param map - A function returning a {@link Source} per value. 323 * @returns An {@link Operator}. 324 * 325 * @remarks 326 * `mergeMap` accepts a mapping function which must return a {@link Source} per value. 327 * The output {@link Source} will emit values from all {@link Source | Sources} the mapping function 328 * returned. 329 * 330 * This can be used to issue multiple values per emission of an input {@link Source}, essentially 331 * multiplexing all values to multiple Sources. 332 * 333 * @example 334 * ```ts 335 * pipe( 336 * interval(50), 337 * mergeMap(x => pipe( 338 * fromValue(x), 339 * delay(100) 340 * )), 341 * subscribe(x => { 342 * console.log(text); // logs: 0, 1, 2... 343 * }) 344 * ); 345 * ``` 346 */ 347export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 348 return source => sink => { 349 let innerTalkbacks: TalkbackFn[] = []; 350 let outerTalkback = talkbackPlaceholder; 351 let outerPulled = false; 352 let ended = false; 353 function applyInnerSource(innerSource: Source<Out>): void { 354 let talkback = talkbackPlaceholder; 355 innerSource(signal => { 356 if (signal === SignalKind.End) { 357 if (innerTalkbacks.length) { 358 const index = innerTalkbacks.indexOf(talkback); 359 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1); 360 if (!innerTalkbacks.length) { 361 if (ended) { 362 sink(SignalKind.End); 363 } else if (!outerPulled) { 364 outerPulled = true; 365 outerTalkback(TalkbackKind.Pull); 366 } 367 } 368 } 369 } else if (signal.tag === SignalKind.Start) { 370 innerTalkbacks.push((talkback = signal[0])); 371 talkback(TalkbackKind.Pull); 372 } else if (innerTalkbacks.length) { 373 sink(signal); 374 talkback(TalkbackKind.Pull); 375 } 376 }); 377 } 378 source(signal => { 379 if (ended) { 380 /*noop*/ 381 } else if (signal === SignalKind.End) { 382 ended = true; 383 if (!innerTalkbacks.length) sink(SignalKind.End); 384 } else if (signal.tag === SignalKind.Start) { 385 outerTalkback = signal[0]; 386 } else { 387 outerPulled = false; 388 applyInnerSource(map(signal[0])); 389 if (!outerPulled) { 390 outerPulled = true; 391 outerTalkback(TalkbackKind.Pull); 392 } 393 } 394 }); 395 sink( 396 start(signal => { 397 if (signal === TalkbackKind.Close) { 398 if (!ended) { 399 ended = true; 400 outerTalkback(TalkbackKind.Close); 401 } 402 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 403 a[i](TalkbackKind.Close); 404 innerTalkbacks.length = 0; 405 } else { 406 if (!ended && !outerPulled) { 407 outerPulled = true; 408 outerTalkback(TalkbackKind.Pull); 409 } else { 410 outerPulled = false; 411 } 412 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 413 a[i](TalkbackKind.Pull); 414 } 415 }) 416 ); 417 }; 418} 419 420/** Flattens a Source emitting Sources into a single Source emitting the inner values. 421 * 422 * @see {@link mergeMap} which this helper uses and instead accept a mapping function. 423 * @param source - An {@link Source} emitting {@link Source | Sources}. 424 * @returns A {@link Source} emitting values from the inner Sources. 425 * 426 * @remarks 427 * `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe 428 * to each incoming source immediately and start passing its emitted values through. 429 * 430 * @example 431 * ```ts 432 * pipe( 433 * fromArray([ 434 * interval(50), 435 * interval(100), 436 * ]), 437 * mergeAll, 438 * subscribe(x => { 439 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2 440 * }) 441 * ); 442 * ``` 443 */ 444export function mergeAll<T>(source: Source<Source<T>>): Source<T> { 445 return mergeMap<Source<T>, T>(identity)(source); 446} 447 448/** Emits values from the passed sources simultaneously. 449 * 450 * @param sources - An array of {@link Source | Sources}. 451 * @returns A {@link Source} emitting values from the input Sources. 452 * 453 * @remarks 454 * `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing 455 * through all their emitted values simultaneously. 456 * 457 * This can be used to interleave the values of multiple sources. 458 * 459 * @example 460 * ```ts 461 * pipe( 462 * merge([ 463 * interval(50), 464 * interval(100), 465 * ]), 466 * subscribe(x => { 467 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2 468 * }) 469 * ); 470 * ``` 471 */ 472export function merge<T>(sources: Source<T>[]): Source<T> { 473 return mergeAll(fromArray(sources)); 474} 475 476/** Calls the passed callback function when the Source ends or is closed. 477 * 478 * @param callback - A function that is called when the {@link Source} ends. 479 * @returns An {@link Operator}. 480 * 481 * @remarks 482 * `onEnd` accepts a callback which is called when the {@link Source} either ends 483 * or is closed. 484 * 485 * This operator can be used to add side-effects to a Source. 486 * 487 * @example 488 * ```ts 489 * pipe( 490 * fromArray([1, 2, 3]), 491 * take(1), 492 * onEnd(() => { 493 * console.log('end'); 494 * }), 495 * publish 496 * ); 497 * ``` 498 */ 499export function onEnd<T>(callback: () => void): Operator<T, T> { 500 return source => sink => { 501 let ended = false; 502 source(signal => { 503 if (ended) { 504 /*noop*/ 505 } else if (signal === SignalKind.End) { 506 ended = true; 507 sink(SignalKind.End); 508 callback(); 509 } else if (signal.tag === SignalKind.Start) { 510 const talkback = signal[0]; 511 sink( 512 start(signal => { 513 if (signal === TalkbackKind.Close) { 514 ended = true; 515 talkback(TalkbackKind.Close); 516 callback(); 517 } else { 518 talkback(signal); 519 } 520 }) 521 ); 522 } else { 523 sink(signal); 524 } 525 }); 526 }; 527} 528 529/** Calls the passed callback function when the Source emits a value. 530 * 531 * @param callback - A function that is called with each value the {@link Source} emits. 532 * @returns An {@link Operator}. 533 * 534 * @remarks 535 * `onPush` accepts a callback which is called for every emitted value of 536 * the {@link Source}. 537 * 538 * This operator can be used to add side-effects to a Source. 539 * 540 * @example 541 * ```ts 542 * pipe( 543 * fromArray([1, 2, 3]), 544 * onPush(value => { 545 * console.log(value); // logs: 1, 2, 3 546 * }), 547 * publish 548 * ); 549 * ``` 550 */ 551export function onPush<T>(callback: (value: T) => void): Operator<T, T> { 552 return source => sink => { 553 let ended = false; 554 source(signal => { 555 if (ended) { 556 /*noop*/ 557 } else if (signal === SignalKind.End) { 558 ended = true; 559 sink(SignalKind.End); 560 } else if (signal.tag === SignalKind.Start) { 561 const talkback = signal[0]; 562 sink( 563 start(signal => { 564 if (signal === TalkbackKind.Close) ended = true; 565 talkback(signal); 566 }) 567 ); 568 } else { 569 callback(signal[0]); 570 sink(signal); 571 } 572 }); 573 }; 574} 575 576/** Calls the passed callback function when the Source starts. 577 * 578 * @param callback - A function that is called when the {@link Source} is started. 579 * @returns An {@link Operator}. 580 * 581 * @remarks 582 * `onPush` accepts a callback which is called for every emitted value of 583 * the {@link Source}. 584 * 585 * This operator can be used to add side-effects to a Source. 586 * Specifically, it's useful to add a side-effect for a Source that triggers only once 587 * the {@link Source} is used and started. 588 * 589 * @example 590 * ```ts 591 * pipe( 592 * fromArray([1, 2, 3]), 593 * onStart(() => { 594 * console.log('start'); 595 * }), 596 * publish 597 * ); 598 * ``` 599 */ 600export function onStart<T>(callback: () => void): Operator<T, T> { 601 return source => sink => 602 source(signal => { 603 if (signal === SignalKind.End) { 604 sink(SignalKind.End); 605 } else if (signal.tag === SignalKind.Start) { 606 sink(signal); 607 callback(); 608 } else { 609 sink(signal); 610 } 611 }); 612} 613 614/** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value. 615 * 616 * @param notifier - A {@link Source} that triggers the last value to be emitted. 617 * @returns An {@link Operator}. 618 * 619 * @remarks 620 * `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source 621 * emits, it will emit the latest value. 622 * 623 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in 624 * too frequently. 625 * 626 * {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be 627 * ignored. 628 * 629 * @example 630 * ```ts 631 * pipe( 632 * interval(50), 633 * sample(interval(100)), 634 * subscribe(x => { 635 * console.log(text); // logs: 0, 2, 4... 636 * }) 637 * ); 638 * ``` 639 */ 640export function sample<S, T>(notifier: Source<S>): Operator<T, T> { 641 return source => sink => { 642 let sourceTalkback = talkbackPlaceholder; 643 let notifierTalkback = talkbackPlaceholder; 644 let value: T | void; 645 let pulled = false; 646 let ended = false; 647 source(signal => { 648 if (ended) { 649 /*noop*/ 650 } else if (signal === SignalKind.End) { 651 ended = true; 652 notifierTalkback(TalkbackKind.Close); 653 sink(SignalKind.End); 654 } else if (signal.tag === SignalKind.Start) { 655 sourceTalkback = signal[0]; 656 } else { 657 value = signal[0]; 658 if (!pulled) { 659 pulled = true; 660 notifierTalkback(TalkbackKind.Pull); 661 sourceTalkback(TalkbackKind.Pull); 662 } else { 663 pulled = false; 664 } 665 } 666 }); 667 notifier(signal => { 668 if (ended) { 669 /*noop*/ 670 } else if (signal === SignalKind.End) { 671 ended = true; 672 sourceTalkback(TalkbackKind.Close); 673 sink(SignalKind.End); 674 } else if (signal.tag === SignalKind.Start) { 675 notifierTalkback = signal[0]; 676 } else if (value !== undefined) { 677 const signal = push(value); 678 value = undefined; 679 sink(signal); 680 } 681 }); 682 sink( 683 start(signal => { 684 if (signal === TalkbackKind.Close && !ended) { 685 ended = true; 686 sourceTalkback(TalkbackKind.Close); 687 notifierTalkback(TalkbackKind.Close); 688 } else if (!ended && !pulled) { 689 pulled = true; 690 sourceTalkback(TalkbackKind.Pull); 691 notifierTalkback(TalkbackKind.Pull); 692 } 693 }) 694 ); 695 }; 696} 697 698/** Maps emitted values using the passed reducer function. 699 * 700 * @param reducer - A function called with the last value by the `reducer` and the emitted value. 701 * @param seed - The initial value that is passed to the `reducer`. 702 * @returns An {@link Operator}. 703 * 704 * @remarks 705 * `scan` accepts a reducer function and a seed value. The reducer will be called initially with the 706 * seed value and the first emitted value. The {@link Source} will then emit the value returned by 707 * the reducer function. Subsequently, the `reducer` is called with the last value the `reducer` 708 * returned and the emitted value. 709 * 710 * This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits 711 * each value of the reducer. 712 * 713 * @example 714 * ```ts 715 * pipe( 716 * fromArray([1, 2, 3]), 717 * scan((acc, x) => acc + x, 0), 718 * subscribe(x => { 719 * console.log(text); // logs: 1, 3, 6 720 * }) 721 * ); 722 * ``` 723 */ 724export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> { 725 return source => sink => { 726 let acc = seed; 727 source(signal => { 728 if (signal === SignalKind.End) { 729 sink(SignalKind.End); 730 } else if (signal.tag === SignalKind.Start) { 731 sink(signal); 732 } else { 733 sink(push((acc = reducer(acc, signal[0])))); 734 } 735 }); 736 }; 737} 738 739/** Shares one underlying subscription to the Source between all Sinks. 740 * 741 * @param source - A {@link Source} that should be shared. 742 * @returns A shared {@link Source}. 743 * 744 * @remarks 745 * `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it 746 * will share one subscription to the input source. This allows side-effects on the input 747 * {@link Source} to only be triggerd once. 748 */ 749export function share<T>(source: Source<T>): Source<T> { 750 let sinks: Sink<T>[] = []; 751 let talkback = talkbackPlaceholder; 752 let gotSignal = false; 753 return sink => { 754 sinks.push(sink); 755 if (sinks.length === 1) { 756 source(signal => { 757 if (signal === SignalKind.End) { 758 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End); 759 sinks.length = 0; 760 } else if (signal.tag === SignalKind.Start) { 761 talkback = signal[0]; 762 } else { 763 gotSignal = false; 764 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal); 765 } 766 }); 767 } 768 sink( 769 start(signal => { 770 if (signal === TalkbackKind.Close) { 771 const index = sinks.indexOf(sink); 772 if (index > -1) (sinks = sinks.slice()).splice(index, 1); 773 if (!sinks.length) talkback(TalkbackKind.Close); 774 } else if (!gotSignal) { 775 gotSignal = true; 776 talkback(TalkbackKind.Pull); 777 } 778 }) 779 ); 780 }; 781} 782 783/** Omits `wait` amount of values from the Source and then runs as usual. 784 * 785 * @param wait - The number of values to be omitted. 786 * @returns An {@link Operator}. 787 * 788 * @remarks 789 * `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards. 790 * This essentially skips a given number of values on the input {@link Source}. 791 * 792 * @example 793 * ```ts 794 * pipe( 795 * fromArray([1, 2, 3]), 796 * skip(2), 797 * subscribe(x => { 798 * console.log(text); // logs: 3 799 * }) 800 * ); 801 * ``` 802 */ 803export function skip<T>(wait: number): Operator<T, T> { 804 return source => sink => { 805 let talkback = talkbackPlaceholder; 806 let rest = wait; 807 source(signal => { 808 if (signal === SignalKind.End) { 809 sink(SignalKind.End); 810 } else if (signal.tag === SignalKind.Start) { 811 talkback = signal[0]; 812 sink(signal); 813 } else if (rest-- > 0) { 814 talkback(TalkbackKind.Pull); 815 } else { 816 sink(signal); 817 } 818 }); 819 }; 820} 821 822/** Omits values from an input Source until a notifier Source emits a value. 823 * 824 * @param notifier - A {@link Source} that starts the operator's sent values. 825 * @returns An {@link Operator}. 826 * 827 * @remarks 828 * `skipUntil` will omit all values from the input {@link Source} until the `notifier` 829 * Source emits a value of its own. It'll then start passing values from the Source through. 830 * 831 * @example 832 * ```ts 833 * pipe( 834 * interval(50), 835 * skipUntil(interval(150)), 836 * subscribe(x => { 837 * console.log(text); // logs: 2, 3... 838 * }) 839 * ); 840 * ``` 841 */ 842export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> { 843 return source => sink => { 844 let sourceTalkback = talkbackPlaceholder; 845 let notifierTalkback = talkbackPlaceholder; 846 let skip = true; 847 let pulled = false; 848 let ended = false; 849 source(signal => { 850 if (ended) { 851 /*noop*/ 852 } else if (signal === SignalKind.End) { 853 ended = true; 854 if (skip) notifierTalkback(TalkbackKind.Close); 855 sink(SignalKind.End); 856 } else if (signal.tag === SignalKind.Start) { 857 sourceTalkback = signal[0]; 858 notifier(signal => { 859 if (signal === SignalKind.End) { 860 if (skip) { 861 ended = true; 862 sourceTalkback(TalkbackKind.Close); 863 } 864 } else if (signal.tag === SignalKind.Start) { 865 (notifierTalkback = signal[0])(TalkbackKind.Pull); 866 } else { 867 skip = false; 868 notifierTalkback(TalkbackKind.Close); 869 } 870 }); 871 } else if (!skip) { 872 pulled = false; 873 sink(signal); 874 } else if (!pulled) { 875 pulled = true; 876 sourceTalkback(TalkbackKind.Pull); 877 notifierTalkback(TalkbackKind.Pull); 878 } else { 879 pulled = false; 880 } 881 }); 882 sink( 883 start(signal => { 884 if (signal === TalkbackKind.Close && !ended) { 885 ended = true; 886 sourceTalkback(TalkbackKind.Close); 887 if (skip) notifierTalkback(TalkbackKind.Close); 888 } else if (!ended && !pulled) { 889 pulled = true; 890 if (skip) notifierTalkback(TalkbackKind.Pull); 891 sourceTalkback(TalkbackKind.Pull); 892 } 893 }) 894 ); 895 }; 896} 897 898/** Omits values from an input Source until a predicate function returns `false`. 899 * 900 * @param predicate - A function returning a boolean per value. 901 * @returns An {@link Operator}. 902 * 903 * @remarks 904 * `skipWhile` will omit all values from the input {@link Source} until the `predicate` 905 * function returns `false`. When the `predicate` function returns `false`, the Source's values will 906 * be passed through. 907 * 908 * @example 909 * ```ts 910 * pipe( 911 * fromArray([1, 2, 3]), 912 * skipWhile(x => x < 2), 913 * subscribe(x => { 914 * console.log(text); // logs: 2, 3 915 * }) 916 * ); 917 * ``` 918 */ 919export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 920 return source => sink => { 921 let talkback = talkbackPlaceholder; 922 let skip = true; 923 source(signal => { 924 if (signal === SignalKind.End) { 925 sink(SignalKind.End); 926 } else if (signal.tag === SignalKind.Start) { 927 talkback = signal[0]; 928 sink(signal); 929 } else if (skip) { 930 if (predicate(signal[0])) { 931 talkback(TalkbackKind.Pull); 932 } else { 933 skip = false; 934 sink(signal); 935 } 936 } else { 937 sink(signal); 938 } 939 }); 940 }; 941} 942 943/** Emits from the latest Source returned by a mapping function per value of the Source. 944 * 945 * @param map - A function returning a {@link Source} per value. 946 * @returns An {@link Operator}. 947 * 948 * @remarks 949 * `switchMap` accepts a mapping function which must return a {@link Source} per value. 950 * The output {@link Source} will emit values from the latest Source the mapping function 951 * returned. If a value is emitted while the last returned Source is still active, the prior Source 952 * will be closed. 953 * 954 * This can be used to issue multiple values per emission of an input {@link Source}, while only 955 * letting one of these sub-Sources be active at a time. 956 * 957 * @example 958 * ```ts 959 * pipe( 960 * interval(100), 961 * switchMap(() => interval(50)), 962 * subscribe(x => { 963 * console.log(text); // logs: 0, 0, 0... 964 * }) 965 * ); 966 * ``` 967 */ 968export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 969 return source => sink => { 970 let outerTalkback = talkbackPlaceholder; 971 let innerTalkback = talkbackPlaceholder; 972 let outerPulled = false; 973 let innerPulled = false; 974 let innerActive = false; 975 let ended = false; 976 function applyInnerSource(innerSource: Source<Out>): void { 977 innerActive = true; 978 innerSource(signal => { 979 if (!innerActive) { 980 /*noop*/ 981 } else if (signal === SignalKind.End) { 982 innerActive = false; 983 if (ended) { 984 sink(SignalKind.End); 985 } else if (!outerPulled) { 986 outerPulled = true; 987 outerTalkback(TalkbackKind.Pull); 988 } 989 } else if (signal.tag === SignalKind.Start) { 990 innerPulled = false; 991 (innerTalkback = signal[0])(TalkbackKind.Pull); 992 } else { 993 sink(signal); 994 if (!innerPulled) { 995 innerTalkback(TalkbackKind.Pull); 996 } else { 997 innerPulled = false; 998 } 999 } 1000 }); 1001 } 1002 source(signal => { 1003 if (ended) { 1004 /*noop*/ 1005 } else if (signal === SignalKind.End) { 1006 ended = true; 1007 if (!innerActive) sink(SignalKind.End); 1008 } else if (signal.tag === SignalKind.Start) { 1009 outerTalkback = signal[0]; 1010 } else { 1011 if (innerActive) { 1012 innerTalkback(TalkbackKind.Close); 1013 innerTalkback = talkbackPlaceholder; 1014 } 1015 if (!outerPulled) { 1016 outerPulled = true; 1017 outerTalkback(TalkbackKind.Pull); 1018 } else { 1019 outerPulled = false; 1020 } 1021 applyInnerSource(map(signal[0])); 1022 } 1023 }); 1024 sink( 1025 start(signal => { 1026 if (signal === TalkbackKind.Close) { 1027 if (!ended) { 1028 ended = true; 1029 outerTalkback(TalkbackKind.Close); 1030 } 1031 if (innerActive) { 1032 innerActive = false; 1033 innerTalkback(TalkbackKind.Close); 1034 } 1035 } else { 1036 if (!ended && !outerPulled) { 1037 outerPulled = true; 1038 outerTalkback(TalkbackKind.Pull); 1039 } 1040 if (innerActive && !innerPulled) { 1041 innerPulled = true; 1042 innerTalkback(TalkbackKind.Pull); 1043 } 1044 } 1045 }) 1046 ); 1047 }; 1048} 1049 1050/** Flattens a Source emitting Sources into a single Source emitting the inner values. 1051 * 1052 * @see {@link switchMap} which this helper uses and instead accept a mapping function. 1053 * @param source - An {@link Source} emitting {@link Source | Sources}. 1054 * @returns A {@link Source} emitting values from the inner Sources. 1055 * 1056 * @remarks 1057 * `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it 1058 * receives a {@link Source} it will close its prior subscription and subscribe to the new Source 1059 * instead, passing through its values. 1060 * 1061 * @example 1062 * ```ts 1063 * pipe( 1064 * interval(100), 1065 * map(() => interval(50)), 1066 * switchAll, 1067 * subscribe(x => { 1068 * console.log(text); // logs: 0, 0, 0... 1069 * }) 1070 * ); 1071 * ``` 1072 */ 1073export function switchAll<T>(source: Source<Source<T>>): Source<T> { 1074 return switchMap<Source<T>, T>(identity)(source); 1075} 1076 1077/** Emits `max` values from the Source and then ends. 1078 * 1079 * @param max - The maximum number of values emitted. 1080 * @returns An {@link Operator}. 1081 * 1082 * @remarks 1083 * `take` will issue all values as normal until the `max` number of emitted values has been reached. 1084 * It will then end and close the {@link Source}. 1085 * 1086 * @example 1087 * ```ts 1088 * pipe( 1089 * fromArray([1, 2, 3]), 1090 * take(2), 1091 * subscribe(x => { 1092 * console.log(text); // logs: 1, 2 1093 * }) 1094 * ); 1095 * ``` 1096 */ 1097export function take<T>(max: number): Operator<T, T> { 1098 return source => sink => { 1099 let talkback = talkbackPlaceholder; 1100 let ended = false; 1101 let taken = 0; 1102 source(signal => { 1103 if (ended) { 1104 /*noop*/ 1105 } else if (signal === SignalKind.End) { 1106 ended = true; 1107 sink(SignalKind.End); 1108 } else if (signal.tag === SignalKind.Start) { 1109 if (max <= 0) { 1110 ended = true; 1111 sink(SignalKind.End); 1112 signal[0](TalkbackKind.Close); 1113 } else { 1114 talkback = signal[0]; 1115 } 1116 } else if (taken++ < max) { 1117 sink(signal); 1118 if (!ended && taken >= max) { 1119 ended = true; 1120 sink(SignalKind.End); 1121 talkback(TalkbackKind.Close); 1122 } 1123 } else { 1124 sink(signal); 1125 } 1126 }); 1127 sink( 1128 start(signal => { 1129 if (signal === TalkbackKind.Close && !ended) { 1130 ended = true; 1131 talkback(TalkbackKind.Close); 1132 } else if (signal === TalkbackKind.Pull && !ended && taken < max) { 1133 talkback(TalkbackKind.Pull); 1134 } 1135 }) 1136 ); 1137 }; 1138} 1139 1140/** Buffers the `max` last values of the Source and emits them once the Source ends. 1141 * 1142 * @param max - The maximum number of values buffered. 1143 * @returns An {@link Operator}. 1144 * 1145 * @remarks 1146 * `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It 1147 * will only emit values stored in the buffer once the {@link Source} ends. 1148 * 1149 * All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would 1150 * synchronously. 1151 * 1152 * @example 1153 * ```ts 1154 * pipe( 1155 * fromArray([1, 2, 3]), 1156 * takeLast(1), 1157 * subscribe(x => { 1158 * console.log(text); // logs: 3 1159 * }) 1160 * ); 1161 * ``` 1162 */ 1163export function takeLast<T>(max: number): Operator<T, T> { 1164 return source => sink => { 1165 const queue: T[] = []; 1166 let talkback = talkbackPlaceholder; 1167 source(signal => { 1168 if (signal === SignalKind.End) { 1169 fromArray(queue)(sink); 1170 } else if (signal.tag === SignalKind.Start) { 1171 if (max <= 0) { 1172 signal[0](TalkbackKind.Close); 1173 fromArray(queue)(sink); 1174 } else { 1175 (talkback = signal[0])(TalkbackKind.Pull); 1176 } 1177 } else { 1178 if (queue.length >= max && max) queue.shift(); 1179 queue.push(signal[0]); 1180 talkback(TalkbackKind.Pull); 1181 } 1182 }); 1183 }; 1184} 1185 1186/** Takes values from an input Source until a notifier Source emits a value. 1187 * 1188 * @param notifier - A {@link Source} that stops the operator's sent values. 1189 * @returns An {@link Operator}. 1190 * 1191 * @remarks 1192 * `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier` 1193 * Source emits a value of its own. It'll then close the {@link Source}. 1194 * 1195 * @example 1196 * ```ts 1197 * pipe( 1198 * interval(50), 1199 * takeUntil(interval(150)), 1200 * subscribe(x => { 1201 * console.log(text); // logs: 0, 1 1202 * }) 1203 * ); 1204 * ``` 1205 */ 1206export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> { 1207 return source => sink => { 1208 let sourceTalkback = talkbackPlaceholder; 1209 let notifierTalkback = talkbackPlaceholder; 1210 let ended = false; 1211 source(signal => { 1212 if (ended) { 1213 /*noop*/ 1214 } else if (signal === SignalKind.End) { 1215 ended = true; 1216 notifierTalkback(TalkbackKind.Close); 1217 sink(SignalKind.End); 1218 } else if (signal.tag === SignalKind.Start) { 1219 sourceTalkback = signal[0]; 1220 notifier(signal => { 1221 if (signal === SignalKind.End) { 1222 /*noop*/ 1223 } else if (signal.tag === SignalKind.Start) { 1224 (notifierTalkback = signal[0])(TalkbackKind.Pull); 1225 } else { 1226 ended = true; 1227 notifierTalkback(TalkbackKind.Close); 1228 sourceTalkback(TalkbackKind.Close); 1229 sink(SignalKind.End); 1230 } 1231 }); 1232 } else { 1233 sink(signal); 1234 } 1235 }); 1236 sink( 1237 start(signal => { 1238 if (signal === TalkbackKind.Close && !ended) { 1239 ended = true; 1240 sourceTalkback(TalkbackKind.Close); 1241 notifierTalkback(TalkbackKind.Close); 1242 } else if (!ended) { 1243 sourceTalkback(TalkbackKind.Pull); 1244 } 1245 }) 1246 ); 1247 }; 1248} 1249 1250/** Takes values from an input Source until a predicate function returns `false`. 1251 * 1252 * @param predicate - A function returning a boolean per value. 1253 * @returns An {@link Operator}. 1254 * 1255 * @remarks 1256 * `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate` 1257 * function returns `false`. When the `predicate` function returns `false`, the current value is 1258 * omitted and the {@link Source} is closed. 1259 * 1260 * @example 1261 * ```ts 1262 * pipe( 1263 * fromArray([1, 2, 3]), 1264 * takeWhile(x => x < 2), 1265 * subscribe(x => { 1266 * console.log(text); // logs: 1 1267 * }) 1268 * ); 1269 * ``` 1270 */ 1271export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 1272 return source => sink => { 1273 let talkback = talkbackPlaceholder; 1274 let ended = false; 1275 source(signal => { 1276 if (ended) { 1277 /*noop*/ 1278 } else if (signal === SignalKind.End) { 1279 ended = true; 1280 sink(SignalKind.End); 1281 } else if (signal.tag === SignalKind.Start) { 1282 talkback = signal[0]; 1283 sink(signal); 1284 } else if (!predicate(signal[0])) { 1285 ended = true; 1286 sink(SignalKind.End); 1287 talkback(TalkbackKind.Close); 1288 } else { 1289 sink(signal); 1290 } 1291 }); 1292 }; 1293} 1294 1295/** Debounces a Source by omitting values until a given timeframe has passed. 1296 * 1297 * @param timing - A function returning a debounce time (ms) per emitted value. 1298 * @returns An {@link Operator}. 1299 * 1300 * @remarks 1301 * `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted 1302 * value. All emitted values issued by the {@link Source} during the returned time will be omitted 1303 * until the time has passed. 1304 * 1305 * Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a 1306 * value is let through. 1307 * 1308 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in 1309 * too frequently. 1310 * 1311 * @example 1312 * ```ts 1313 * pipe( 1314 * interval(50), 1315 * debounce(() => 100), 1316 * subscribe(x => { 1317 * console.log(text); // never logs any value 1318 * }) 1319 * ); 1320 * ``` 1321 */ 1322export function debounce<T>(timing: (value: T) => number): Operator<T, T> { 1323 return source => sink => { 1324 let id: any | void; 1325 let deferredEnded = false; 1326 let ended = false; 1327 source(signal => { 1328 if (ended) { 1329 /*noop*/ 1330 } else if (signal === SignalKind.End) { 1331 ended = true; 1332 if (id) { 1333 deferredEnded = true; 1334 } else { 1335 sink(SignalKind.End); 1336 } 1337 } else if (signal.tag === SignalKind.Start) { 1338 const talkback = signal[0]; 1339 sink( 1340 start(signal => { 1341 if (signal === TalkbackKind.Close && !ended) { 1342 ended = true; 1343 deferredEnded = false; 1344 if (id) clearTimeout(id); 1345 talkback(TalkbackKind.Close); 1346 } else if (!ended) { 1347 talkback(TalkbackKind.Pull); 1348 } 1349 }) 1350 ); 1351 } else { 1352 if (id) clearTimeout(id); 1353 id = setTimeout(() => { 1354 id = undefined; 1355 sink(signal); 1356 if (deferredEnded) sink(SignalKind.End); 1357 }, timing(signal[0])); 1358 } 1359 }); 1360 }; 1361} 1362 1363/** Delays each signal emitted by a Source by given time (ms). 1364 * 1365 * @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed. 1366 * @returns An {@link Operator}. 1367 * 1368 * @remarks 1369 * `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by. 1370 * This will create a timeout per received signal and delay the emitted values accordingly. 1371 * 1372 * Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to 1373 * be ordered. Otherwise, signals will arrive in the wrong order at the sink. 1374 */ 1375export function delay<T>(wait: number): Operator<T, T> { 1376 return source => sink => { 1377 let active = 0; 1378 source(signal => { 1379 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 1380 sink(signal); 1381 } else { 1382 active++; 1383 setTimeout(() => { 1384 if (active) { 1385 active--; 1386 sink(signal); 1387 } 1388 }, wait); 1389 } 1390 }); 1391 }; 1392} 1393 1394/** Throttles a Source by omitting values that are emitted before a given timeout. 1395 * 1396 * @param timing - A function returning a throttle time (ms) per emitted value. 1397 * @returns An {@link Operator}. 1398 * 1399 * @remarks 1400 * `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted 1401 * value. During the returned timeframe all values issued by the {@link Source} will be omitted and 1402 * dropped. 1403 * 1404 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in 1405 * too frequently. 1406 * 1407 * @example 1408 * ```ts 1409 * pipe( 1410 * interval(50), 1411 * throttle(() => 100), 1412 * subscribe(x => { 1413 * // omits every second value: 0, 2, 4... 1414 * console.log(text); 1415 * }) 1416 * ); 1417 * ``` 1418 */ 1419export function throttle<T>(timing: (value: T) => number): Operator<T, T> { 1420 return source => sink => { 1421 let skip = false; 1422 let id: any | void; 1423 source(signal => { 1424 if (signal === SignalKind.End) { 1425 if (id) clearTimeout(id); 1426 sink(SignalKind.End); 1427 } else if (signal.tag === SignalKind.Start) { 1428 const talkback = signal[0]; 1429 sink( 1430 start(signal => { 1431 if (signal === TalkbackKind.Close) { 1432 if (id) clearTimeout(id); 1433 talkback(TalkbackKind.Close); 1434 } else { 1435 talkback(TalkbackKind.Pull); 1436 } 1437 }) 1438 ); 1439 } else if (!skip) { 1440 skip = true; 1441 if (id) clearTimeout(id); 1442 id = setTimeout(() => { 1443 id = undefined; 1444 skip = false; 1445 }, timing(signal[0])); 1446 sink(signal); 1447 } 1448 }); 1449 }; 1450} 1451 1452export { mergeAll as flatten, onPush as tap };