Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.3.5 42 kB view raw
1import { Push, Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3import { fromArray } from './sources'; 4 5const identity = <T>(x: T): T => x; 6 7/** Buffers values and emits the array of bufferd values each time a `notifier` Source emits. 8 * 9 * @param notifier - A {@link Source} that releases the current buffer. 10 * @returns An {@link Operator}. 11 * 12 * @remarks 13 * `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source 14 * emits, it will emit an array of all buffered values. 15 * 16 * This can be used to group values over time. A buffer will only be emitted when it contains any 17 * values. 18 * 19 * @example 20 * ```ts 21 * pipe( 22 * interval(50), 23 * buffer(interval(100)), 24 * subscribe(x => { 25 * console.log(text); // logs: [0], [1, 2], [3, 4]... 26 * }) 27 * ); 28 * ``` 29 */ 30export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> { 31 return source => sink => { 32 let buffer: T[] = []; 33 let sourceTalkback = talkbackPlaceholder; 34 let notifierTalkback = talkbackPlaceholder; 35 let pulled = false; 36 let ended = false; 37 source(signal => { 38 if (ended) { 39 /*noop*/ 40 } else if (signal === SignalKind.End) { 41 ended = true; 42 notifierTalkback(TalkbackKind.Close); 43 if (buffer.length) sink(push(buffer)); 44 sink(SignalKind.End); 45 } else if (signal.tag === SignalKind.Start) { 46 sourceTalkback = signal[0]; 47 notifier(signal => { 48 if (ended) { 49 /*noop*/ 50 } else if (signal === SignalKind.End) { 51 ended = true; 52 sourceTalkback(TalkbackKind.Close); 53 if (buffer.length) sink(push(buffer)); 54 sink(SignalKind.End); 55 } else if (signal.tag === SignalKind.Start) { 56 notifierTalkback = signal[0]; 57 } else if (buffer.length) { 58 const signal = push(buffer); 59 buffer = []; 60 sink(signal); 61 } 62 }); 63 } else { 64 buffer.push(signal[0]); 65 if (!pulled) { 66 pulled = true; 67 sourceTalkback(TalkbackKind.Pull); 68 notifierTalkback(TalkbackKind.Pull); 69 } else { 70 pulled = false; 71 } 72 } 73 }); 74 sink( 75 start(signal => { 76 if (signal === TalkbackKind.Close && !ended) { 77 ended = true; 78 sourceTalkback(TalkbackKind.Close); 79 notifierTalkback(TalkbackKind.Close); 80 } else if (!ended && !pulled) { 81 pulled = true; 82 sourceTalkback(TalkbackKind.Pull); 83 notifierTalkback(TalkbackKind.Pull); 84 } 85 }) 86 ); 87 }; 88} 89 90/** Emits in order from the Sources returned by a mapping function per value of the Source. 91 * 92 * @param map - A function returning a {@link Source} per value. 93 * @returns An {@link Operator}. 94 * 95 * @remarks 96 * `concatMap` accepts a mapping function which must return a {@link Source} per value. 97 * The output {@link Source} will emit values from each Source the function returned, in order, 98 * queuing sources that aren't yet active. 99 * 100 * This can be used to issue multiple values per emission of an input {@link Source}, while keeping 101 * the order of their outputs consistent. 102 * 103 * @example 104 * ```ts 105 * pipe( 106 * fromArray([1, 2]), 107 * concatMap(x => fromArray([x, x * 2])), 108 * subscribe(x => { 109 * console.log(text); // logs: 1, 2, 2, 4 110 * }) 111 * ); 112 * ``` 113 */ 114export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 115 return source => sink => { 116 const inputQueue: In[] = []; 117 let outerTalkback = talkbackPlaceholder; 118 let innerTalkback = talkbackPlaceholder; 119 let outerPulled = false; 120 let innerPulled = false; 121 let innerActive = false; 122 let ended = false; 123 function applyInnerSource(innerSource: Source<Out>): void { 124 innerActive = true; 125 innerSource(signal => { 126 if (signal === SignalKind.End) { 127 if (innerActive) { 128 innerActive = false; 129 if (inputQueue.length) { 130 applyInnerSource(map(inputQueue.shift()!)); 131 } else if (ended) { 132 sink(SignalKind.End); 133 } else if (!outerPulled) { 134 outerPulled = true; 135 outerTalkback(TalkbackKind.Pull); 136 } 137 } 138 } else if (signal.tag === SignalKind.Start) { 139 innerPulled = false; 140 (innerTalkback = signal[0])(TalkbackKind.Pull); 141 } else if (innerActive) { 142 sink(signal); 143 if (innerPulled) { 144 innerPulled = false; 145 } else { 146 innerTalkback(TalkbackKind.Pull); 147 } 148 } 149 }); 150 } 151 source(signal => { 152 if (ended) { 153 /*noop*/ 154 } else if (signal === SignalKind.End) { 155 ended = true; 156 if (!innerActive && !inputQueue.length) sink(SignalKind.End); 157 } else if (signal.tag === SignalKind.Start) { 158 outerTalkback = signal[0]; 159 } else { 160 outerPulled = false; 161 if (innerActive) { 162 inputQueue.push(signal[0]); 163 } else { 164 applyInnerSource(map(signal[0])); 165 } 166 } 167 }); 168 sink( 169 start(signal => { 170 if (signal === TalkbackKind.Close) { 171 if (!ended) { 172 ended = true; 173 outerTalkback(TalkbackKind.Close); 174 } 175 if (innerActive) { 176 innerActive = false; 177 innerTalkback(TalkbackKind.Close); 178 } 179 } else { 180 if (!ended && !outerPulled) { 181 outerPulled = true; 182 outerTalkback(TalkbackKind.Pull); 183 } 184 if (innerActive && !innerPulled) { 185 innerPulled = true; 186 innerTalkback(TalkbackKind.Pull); 187 } 188 } 189 }) 190 ); 191 }; 192} 193 194/** Flattens a Source emitting Sources into a single Source emitting the inner values in order. 195 * 196 * @see {@link concatMap} which this helper uses and instead accept a mapping function. 197 * @param source - An {@link Source} emitting {@link Source | Sources}. 198 * @returns A {@link Source} emitting values from the inner Sources. 199 * 200 * @remarks 201 * `concatAll` accepts a {@link Source} emitting {@link Source | Sources}. 202 * The output {@link Source} will emit values from each Source, in order, queuing sources that 203 * aren't yet active. 204 * 205 * @example 206 * ```ts 207 * pipe( 208 * fromArray([ 209 * fromArray([1, 2]), 210 * fromArray([3, 4]), 211 * ]), 212 * concatAll, 213 * subscribe(x => { 214 * console.log(text); // logs: 1, 2, 3, 4 215 * }) 216 * ); 217 * ``` 218 */ 219export function concatAll<T>(source: Source<Source<T>>): Source<T> { 220 return concatMap<Source<T>, T>(identity)(source); 221} 222 223/** Emits values from the passed sources in order. 224 * 225 * @param sources - An array of {@link Source | Sources}. 226 * @returns A {@link Source} emitting values from the input Sources. 227 * 228 * @remarks 229 * `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting 230 * with the first one and continuing to the next only when the prior source ended. 231 * 232 * This can be used to issue combine sources while keeping the order of their values intact. 233 * 234 * @example 235 * ```ts 236 * pipe( 237 * concat([ 238 * fromArray([1, 2]), 239 * fromArray([3, 4]), 240 * ]), 241 * subscribe(x => { 242 * console.log(text); // logs: 1, 2, 3, 4 243 * }) 244 * ); 245 * ``` 246 */ 247export function concat<T>(sources: Source<T>[]): Source<T> { 248 return concatAll(fromArray(sources)); 249} 250 251/** Filters out emitted values for which the passed predicate function returns `false`. 252 * 253 * @param predicate - A function returning a boolean per value. 254 * @returns An {@link Operator}. 255 * 256 * @remarks 257 * `filter` will omit values from the {@link Source} for which the passed `predicate` function 258 * returns `false`. 259 * 260 * @example 261 * ```ts 262 * pipe( 263 * fromArray([1, 2, 3]), 264 * filter(x => x % 2 === 0), 265 * subscribe(x => { 266 * console.log(text); // logs: 2 267 * }) 268 * ); 269 * ``` 270 */ 271function filter<In, Out extends In>(predicate: (value: In) => value is Out): Operator<In, Out>; 272function filter<T>(predicate: (value: T) => boolean): Operator<T, T>; 273function filter<In, Out>(predicate: (value: In) => boolean): Operator<In, Out> { 274 return source => sink => { 275 let talkback = talkbackPlaceholder; 276 source(signal => { 277 if (signal === SignalKind.End) { 278 sink(SignalKind.End); 279 } else if (signal.tag === SignalKind.Start) { 280 talkback = signal[0]; 281 sink(signal); 282 } else if (!predicate(signal[0])) { 283 talkback(TalkbackKind.Pull); 284 } else { 285 sink(signal as Push<any>); 286 } 287 }); 288 }; 289} 290 291export { filter }; 292 293/** Maps emitted values using the passed mapping function. 294 * 295 * @param map - A function returning transforming the {@link Source | Source's} values. 296 * @returns An {@link Operator}. 297 * 298 * @remarks 299 * `map` accepts a transform function and calls it on each emitted value. It then emits 300 * the values returned by the transform function instead. 301 * 302 * @example 303 * ```ts 304 * pipe( 305 * fromArray([1, 2, 3]), 306 * map(x => x * 2), 307 * subscribe(x => { 308 * console.log(text); // logs: 2, 4, 6 309 * }) 310 * ); 311 * ``` 312 */ 313export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> { 314 return source => sink => 315 source(signal => { 316 if (signal === SignalKind.End || signal.tag === SignalKind.Start) { 317 sink(signal); 318 } else { 319 sink(push(map(signal[0]))); 320 } 321 }); 322} 323 324/** Emits from the Sources returned by a mapping function per value of the Source. 325 * 326 * @param map - A function returning a {@link Source} per value. 327 * @returns An {@link Operator}. 328 * 329 * @remarks 330 * `mergeMap` accepts a mapping function which must return a {@link Source} per value. 331 * The output {@link Source} will emit values from all {@link Source | Sources} the mapping function 332 * returned. 333 * 334 * This can be used to issue multiple values per emission of an input {@link Source}, essentially 335 * multiplexing all values to multiple Sources. 336 * 337 * @example 338 * ```ts 339 * pipe( 340 * interval(50), 341 * mergeMap(x => pipe( 342 * fromValue(x), 343 * delay(100) 344 * )), 345 * subscribe(x => { 346 * console.log(text); // logs: 0, 1, 2... 347 * }) 348 * ); 349 * ``` 350 */ 351export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 352 return source => sink => { 353 let innerTalkbacks: TalkbackFn[] = []; 354 let outerTalkback = talkbackPlaceholder; 355 let outerPulled = false; 356 let ended = false; 357 function applyInnerSource(innerSource: Source<Out>): void { 358 let talkback = talkbackPlaceholder; 359 innerSource(signal => { 360 if (signal === SignalKind.End) { 361 if (innerTalkbacks.length) { 362 const index = innerTalkbacks.indexOf(talkback); 363 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1); 364 if (!innerTalkbacks.length) { 365 if (ended) { 366 sink(SignalKind.End); 367 } else if (!outerPulled) { 368 outerPulled = true; 369 outerTalkback(TalkbackKind.Pull); 370 } 371 } 372 } 373 } else if (signal.tag === SignalKind.Start) { 374 innerTalkbacks.push((talkback = signal[0])); 375 talkback(TalkbackKind.Pull); 376 } else if (innerTalkbacks.length) { 377 sink(signal); 378 talkback(TalkbackKind.Pull); 379 } 380 }); 381 } 382 source(signal => { 383 if (ended) { 384 /*noop*/ 385 } else if (signal === SignalKind.End) { 386 ended = true; 387 if (!innerTalkbacks.length) sink(SignalKind.End); 388 } else if (signal.tag === SignalKind.Start) { 389 outerTalkback = signal[0]; 390 } else { 391 outerPulled = false; 392 applyInnerSource(map(signal[0])); 393 if (!outerPulled) { 394 outerPulled = true; 395 outerTalkback(TalkbackKind.Pull); 396 } 397 } 398 }); 399 sink( 400 start(signal => { 401 if (signal === TalkbackKind.Close) { 402 if (!ended) { 403 ended = true; 404 outerTalkback(TalkbackKind.Close); 405 } 406 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 407 a[i](TalkbackKind.Close); 408 innerTalkbacks.length = 0; 409 } else { 410 if (!ended && !outerPulled) { 411 outerPulled = true; 412 outerTalkback(TalkbackKind.Pull); 413 } else { 414 outerPulled = false; 415 } 416 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 417 a[i](TalkbackKind.Pull); 418 } 419 }) 420 ); 421 }; 422} 423 424/** Flattens a Source emitting Sources into a single Source emitting the inner values. 425 * 426 * @see {@link mergeMap} which this helper uses and instead accept a mapping function. 427 * @param source - An {@link Source} emitting {@link Source | Sources}. 428 * @returns A {@link Source} emitting values from the inner Sources. 429 * 430 * @remarks 431 * `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe 432 * to each incoming source immediately and start passing its emitted values through. 433 * 434 * @example 435 * ```ts 436 * pipe( 437 * fromArray([ 438 * interval(50), 439 * interval(100), 440 * ]), 441 * mergeAll, 442 * subscribe(x => { 443 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2 444 * }) 445 * ); 446 * ``` 447 */ 448export function mergeAll<T>(source: Source<Source<T>>): Source<T> { 449 return mergeMap<Source<T>, T>(identity)(source); 450} 451 452/** Emits values from the passed sources simultaneously. 453 * 454 * @param sources - An array of {@link Source | Sources}. 455 * @returns A {@link Source} emitting values from the input Sources. 456 * 457 * @remarks 458 * `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing 459 * through all their emitted values simultaneously. 460 * 461 * This can be used to interleave the values of multiple sources. 462 * 463 * @example 464 * ```ts 465 * pipe( 466 * merge([ 467 * interval(50), 468 * interval(100), 469 * ]), 470 * subscribe(x => { 471 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2 472 * }) 473 * ); 474 * ``` 475 */ 476export function merge<T>(sources: Source<T>[]): Source<T> { 477 return mergeAll(fromArray(sources)); 478} 479 480/** Calls the passed callback function when the Source ends or is closed. 481 * 482 * @param callback - A function that is called when the {@link Source} ends. 483 * @returns An {@link Operator}. 484 * 485 * @remarks 486 * `onEnd` accepts a callback which is called when the {@link Source} either ends 487 * or is closed. 488 * 489 * This operator can be used to add side-effects to a Source. 490 * 491 * @example 492 * ```ts 493 * pipe( 494 * fromArray([1, 2, 3]), 495 * take(1), 496 * onEnd(() => { 497 * console.log('end'); 498 * }), 499 * publish 500 * ); 501 * ``` 502 */ 503export function onEnd<T>(callback: () => void): Operator<T, T> { 504 return source => sink => { 505 let ended = false; 506 source(signal => { 507 if (ended) { 508 /*noop*/ 509 } else if (signal === SignalKind.End) { 510 ended = true; 511 sink(SignalKind.End); 512 callback(); 513 } else if (signal.tag === SignalKind.Start) { 514 const talkback = signal[0]; 515 sink( 516 start(signal => { 517 if (signal === TalkbackKind.Close) { 518 ended = true; 519 talkback(TalkbackKind.Close); 520 callback(); 521 } else { 522 talkback(signal); 523 } 524 }) 525 ); 526 } else { 527 sink(signal); 528 } 529 }); 530 }; 531} 532 533/** Calls the passed callback function when the Source emits a value. 534 * 535 * @param callback - A function that is called with each value the {@link Source} emits. 536 * @returns An {@link Operator}. 537 * 538 * @remarks 539 * `onPush` accepts a callback which is called for every emitted value of 540 * the {@link Source}. 541 * 542 * This operator can be used to add side-effects to a Source. 543 * 544 * @example 545 * ```ts 546 * pipe( 547 * fromArray([1, 2, 3]), 548 * onPush(value => { 549 * console.log(value); // logs: 1, 2, 3 550 * }), 551 * publish 552 * ); 553 * ``` 554 */ 555export function onPush<T>(callback: (value: T) => void): Operator<T, T> { 556 return source => sink => { 557 let ended = false; 558 source(signal => { 559 if (ended) { 560 /*noop*/ 561 } else if (signal === SignalKind.End) { 562 ended = true; 563 sink(SignalKind.End); 564 } else if (signal.tag === SignalKind.Start) { 565 const talkback = signal[0]; 566 sink( 567 start(signal => { 568 if (signal === TalkbackKind.Close) ended = true; 569 talkback(signal); 570 }) 571 ); 572 } else { 573 callback(signal[0]); 574 sink(signal); 575 } 576 }); 577 }; 578} 579 580/** Calls the passed callback function when the Source starts. 581 * 582 * @param callback - A function that is called when the {@link Source} is started. 583 * @returns An {@link Operator}. 584 * 585 * @remarks 586 * `onPush` accepts a callback which is called for every emitted value of 587 * the {@link Source}. 588 * 589 * This operator can be used to add side-effects to a Source. 590 * Specifically, it's useful to add a side-effect for a Source that triggers only once 591 * the {@link Source} is used and started. 592 * 593 * @example 594 * ```ts 595 * pipe( 596 * fromArray([1, 2, 3]), 597 * onStart(() => { 598 * console.log('start'); 599 * }), 600 * publish 601 * ); 602 * ``` 603 */ 604export function onStart<T>(callback: () => void): Operator<T, T> { 605 return source => sink => 606 source(signal => { 607 if (signal === SignalKind.End) { 608 sink(SignalKind.End); 609 } else if (signal.tag === SignalKind.Start) { 610 sink(signal); 611 callback(); 612 } else { 613 sink(signal); 614 } 615 }); 616} 617 618/** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value. 619 * 620 * @param notifier - A {@link Source} that triggers the last value to be emitted. 621 * @returns An {@link Operator}. 622 * 623 * @remarks 624 * `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source 625 * emits, it will emit the latest value. 626 * 627 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in 628 * too frequently. 629 * 630 * {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be 631 * ignored. 632 * 633 * @example 634 * ```ts 635 * pipe( 636 * interval(50), 637 * sample(interval(100)), 638 * subscribe(x => { 639 * console.log(text); // logs: 0, 2, 4... 640 * }) 641 * ); 642 * ``` 643 */ 644export function sample<S, T>(notifier: Source<S>): Operator<T, T> { 645 return source => sink => { 646 let sourceTalkback = talkbackPlaceholder; 647 let notifierTalkback = talkbackPlaceholder; 648 let value: T | void; 649 let pulled = false; 650 let ended = false; 651 source(signal => { 652 if (ended) { 653 /*noop*/ 654 } else if (signal === SignalKind.End) { 655 ended = true; 656 notifierTalkback(TalkbackKind.Close); 657 sink(SignalKind.End); 658 } else if (signal.tag === SignalKind.Start) { 659 sourceTalkback = signal[0]; 660 } else { 661 value = signal[0]; 662 if (!pulled) { 663 pulled = true; 664 notifierTalkback(TalkbackKind.Pull); 665 sourceTalkback(TalkbackKind.Pull); 666 } else { 667 pulled = false; 668 } 669 } 670 }); 671 notifier(signal => { 672 if (ended) { 673 /*noop*/ 674 } else if (signal === SignalKind.End) { 675 ended = true; 676 sourceTalkback(TalkbackKind.Close); 677 sink(SignalKind.End); 678 } else if (signal.tag === SignalKind.Start) { 679 notifierTalkback = signal[0]; 680 } else if (value !== undefined) { 681 const signal = push(value); 682 value = undefined; 683 sink(signal); 684 } 685 }); 686 sink( 687 start(signal => { 688 if (signal === TalkbackKind.Close && !ended) { 689 ended = true; 690 sourceTalkback(TalkbackKind.Close); 691 notifierTalkback(TalkbackKind.Close); 692 } else if (!ended && !pulled) { 693 pulled = true; 694 sourceTalkback(TalkbackKind.Pull); 695 notifierTalkback(TalkbackKind.Pull); 696 } 697 }) 698 ); 699 }; 700} 701 702/** Maps emitted values using the passed reducer function. 703 * 704 * @param reducer - A function called with the last value by the `reducer` and the emitted value. 705 * @param seed - The initial value that is passed to the `reducer`. 706 * @returns An {@link Operator}. 707 * 708 * @remarks 709 * `scan` accepts a reducer function and a seed value. The reducer will be called initially with the 710 * seed value and the first emitted value. The {@link Source} will then emit the value returned by 711 * the reducer function. Subsequently, the `reducer` is called with the last value the `reducer` 712 * returned and the emitted value. 713 * 714 * This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits 715 * each value of the reducer. 716 * 717 * @example 718 * ```ts 719 * pipe( 720 * fromArray([1, 2, 3]), 721 * scan((acc, x) => acc + x, 0), 722 * subscribe(x => { 723 * console.log(text); // logs: 1, 3, 6 724 * }) 725 * ); 726 * ``` 727 */ 728export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> { 729 return source => sink => { 730 let acc = seed; 731 source(signal => { 732 if (signal === SignalKind.End) { 733 sink(SignalKind.End); 734 } else if (signal.tag === SignalKind.Start) { 735 sink(signal); 736 } else { 737 sink(push((acc = reducer(acc, signal[0])))); 738 } 739 }); 740 }; 741} 742 743/** Shares one underlying subscription to the Source between all Sinks. 744 * 745 * @param source - A {@link Source} that should be shared. 746 * @returns A shared {@link Source}. 747 * 748 * @remarks 749 * `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it 750 * will share one subscription to the input source. This allows side-effects on the input 751 * {@link Source} to only be triggerd once. 752 */ 753export function share<T>(source: Source<T>): Source<T> { 754 let sinks: Sink<T>[] = []; 755 let talkback = talkbackPlaceholder; 756 let gotSignal = false; 757 return sink => { 758 sinks.push(sink); 759 if (sinks.length === 1) { 760 source(signal => { 761 if (signal === SignalKind.End) { 762 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End); 763 sinks.length = 0; 764 } else if (signal.tag === SignalKind.Start) { 765 talkback = signal[0]; 766 } else { 767 gotSignal = false; 768 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal); 769 } 770 }); 771 } 772 sink( 773 start(signal => { 774 if (signal === TalkbackKind.Close) { 775 const index = sinks.indexOf(sink); 776 if (index > -1) (sinks = sinks.slice()).splice(index, 1); 777 if (!sinks.length) talkback(TalkbackKind.Close); 778 } else if (!gotSignal) { 779 gotSignal = true; 780 talkback(TalkbackKind.Pull); 781 } 782 }) 783 ); 784 }; 785} 786 787/** Omits `wait` amount of values from the Source and then runs as usual. 788 * 789 * @param wait - The number of values to be omitted. 790 * @returns An {@link Operator}. 791 * 792 * @remarks 793 * `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards. 794 * This essentially skips a given number of values on the input {@link Source}. 795 * 796 * @example 797 * ```ts 798 * pipe( 799 * fromArray([1, 2, 3]), 800 * skip(2), 801 * subscribe(x => { 802 * console.log(text); // logs: 3 803 * }) 804 * ); 805 * ``` 806 */ 807export function skip<T>(wait: number): Operator<T, T> { 808 return source => sink => { 809 let talkback = talkbackPlaceholder; 810 let rest = wait; 811 source(signal => { 812 if (signal === SignalKind.End) { 813 sink(SignalKind.End); 814 } else if (signal.tag === SignalKind.Start) { 815 talkback = signal[0]; 816 sink(signal); 817 } else if (rest-- > 0) { 818 talkback(TalkbackKind.Pull); 819 } else { 820 sink(signal); 821 } 822 }); 823 }; 824} 825 826/** Omits values from an input Source until a notifier Source emits a value. 827 * 828 * @param notifier - A {@link Source} that starts the operator's sent values. 829 * @returns An {@link Operator}. 830 * 831 * @remarks 832 * `skipUntil` will omit all values from the input {@link Source} until the `notifier` 833 * Source emits a value of its own. It'll then start passing values from the Source through. 834 * 835 * @example 836 * ```ts 837 * pipe( 838 * interval(50), 839 * skipUntil(interval(150)), 840 * subscribe(x => { 841 * console.log(text); // logs: 2, 3... 842 * }) 843 * ); 844 * ``` 845 */ 846export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> { 847 return source => sink => { 848 let sourceTalkback = talkbackPlaceholder; 849 let notifierTalkback = talkbackPlaceholder; 850 let skip = true; 851 let pulled = false; 852 let ended = false; 853 source(signal => { 854 if (ended) { 855 /*noop*/ 856 } else if (signal === SignalKind.End) { 857 ended = true; 858 if (skip) notifierTalkback(TalkbackKind.Close); 859 sink(SignalKind.End); 860 } else if (signal.tag === SignalKind.Start) { 861 sourceTalkback = signal[0]; 862 notifier(signal => { 863 if (signal === SignalKind.End) { 864 if (skip) { 865 ended = true; 866 sourceTalkback(TalkbackKind.Close); 867 } 868 } else if (signal.tag === SignalKind.Start) { 869 (notifierTalkback = signal[0])(TalkbackKind.Pull); 870 } else { 871 skip = false; 872 notifierTalkback(TalkbackKind.Close); 873 } 874 }); 875 } else if (!skip) { 876 pulled = false; 877 sink(signal); 878 } else if (!pulled) { 879 pulled = true; 880 sourceTalkback(TalkbackKind.Pull); 881 notifierTalkback(TalkbackKind.Pull); 882 } else { 883 pulled = false; 884 } 885 }); 886 sink( 887 start(signal => { 888 if (signal === TalkbackKind.Close && !ended) { 889 ended = true; 890 sourceTalkback(TalkbackKind.Close); 891 if (skip) notifierTalkback(TalkbackKind.Close); 892 } else if (!ended && !pulled) { 893 pulled = true; 894 if (skip) notifierTalkback(TalkbackKind.Pull); 895 sourceTalkback(TalkbackKind.Pull); 896 } 897 }) 898 ); 899 }; 900} 901 902/** Omits values from an input Source until a predicate function returns `false`. 903 * 904 * @param predicate - A function returning a boolean per value. 905 * @returns An {@link Operator}. 906 * 907 * @remarks 908 * `skipWhile` will omit all values from the input {@link Source} until the `predicate` 909 * function returns `false`. When the `predicate` function returns `false`, the Source's values will 910 * be passed through. 911 * 912 * @example 913 * ```ts 914 * pipe( 915 * fromArray([1, 2, 3]), 916 * skipWhile(x => x < 2), 917 * subscribe(x => { 918 * console.log(text); // logs: 2, 3 919 * }) 920 * ); 921 * ``` 922 */ 923export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 924 return source => sink => { 925 let talkback = talkbackPlaceholder; 926 let skip = true; 927 source(signal => { 928 if (signal === SignalKind.End) { 929 sink(SignalKind.End); 930 } else if (signal.tag === SignalKind.Start) { 931 talkback = signal[0]; 932 sink(signal); 933 } else if (skip) { 934 if (predicate(signal[0])) { 935 talkback(TalkbackKind.Pull); 936 } else { 937 skip = false; 938 sink(signal); 939 } 940 } else { 941 sink(signal); 942 } 943 }); 944 }; 945} 946 947/** Emits from the latest Source returned by a mapping function per value of the Source. 948 * 949 * @param map - A function returning a {@link Source} per value. 950 * @returns An {@link Operator}. 951 * 952 * @remarks 953 * `switchMap` accepts a mapping function which must return a {@link Source} per value. 954 * The output {@link Source} will emit values from the latest Source the mapping function 955 * returned. If a value is emitted while the last returned Source is still active, the prior Source 956 * will be closed. 957 * 958 * This can be used to issue multiple values per emission of an input {@link Source}, while only 959 * letting one of these sub-Sources be active at a time. 960 * 961 * @example 962 * ```ts 963 * pipe( 964 * interval(100), 965 * switchMap(() => interval(50)), 966 * subscribe(x => { 967 * console.log(text); // logs: 0, 0, 0... 968 * }) 969 * ); 970 * ``` 971 */ 972export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 973 return source => sink => { 974 let outerTalkback = talkbackPlaceholder; 975 let innerTalkback = talkbackPlaceholder; 976 let outerPulled = false; 977 let innerPulled = false; 978 let innerActive = false; 979 let ended = false; 980 function applyInnerSource(innerSource: Source<Out>): void { 981 innerActive = true; 982 innerSource(signal => { 983 if (!innerActive) { 984 /*noop*/ 985 } else if (signal === SignalKind.End) { 986 innerActive = false; 987 if (ended) { 988 sink(SignalKind.End); 989 } else if (!outerPulled) { 990 outerPulled = true; 991 outerTalkback(TalkbackKind.Pull); 992 } 993 } else if (signal.tag === SignalKind.Start) { 994 innerPulled = false; 995 (innerTalkback = signal[0])(TalkbackKind.Pull); 996 } else { 997 sink(signal); 998 if (!innerPulled) { 999 innerTalkback(TalkbackKind.Pull); 1000 } else { 1001 innerPulled = false; 1002 } 1003 } 1004 }); 1005 } 1006 source(signal => { 1007 if (ended) { 1008 /*noop*/ 1009 } else if (signal === SignalKind.End) { 1010 ended = true; 1011 if (!innerActive) sink(SignalKind.End); 1012 } else if (signal.tag === SignalKind.Start) { 1013 outerTalkback = signal[0]; 1014 } else { 1015 if (innerActive) { 1016 innerTalkback(TalkbackKind.Close); 1017 innerTalkback = talkbackPlaceholder; 1018 } 1019 if (!outerPulled) { 1020 outerPulled = true; 1021 outerTalkback(TalkbackKind.Pull); 1022 } else { 1023 outerPulled = false; 1024 } 1025 applyInnerSource(map(signal[0])); 1026 } 1027 }); 1028 sink( 1029 start(signal => { 1030 if (signal === TalkbackKind.Close) { 1031 if (!ended) { 1032 ended = true; 1033 outerTalkback(TalkbackKind.Close); 1034 } 1035 if (innerActive) { 1036 innerActive = false; 1037 innerTalkback(TalkbackKind.Close); 1038 } 1039 } else { 1040 if (!ended && !outerPulled) { 1041 outerPulled = true; 1042 outerTalkback(TalkbackKind.Pull); 1043 } 1044 if (innerActive && !innerPulled) { 1045 innerPulled = true; 1046 innerTalkback(TalkbackKind.Pull); 1047 } 1048 } 1049 }) 1050 ); 1051 }; 1052} 1053 1054/** Flattens a Source emitting Sources into a single Source emitting the inner values. 1055 * 1056 * @see {@link switchMap} which this helper uses and instead accept a mapping function. 1057 * @param source - An {@link Source} emitting {@link Source | Sources}. 1058 * @returns A {@link Source} emitting values from the inner Sources. 1059 * 1060 * @remarks 1061 * `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it 1062 * receives a {@link Source} it will close its prior subscription and subscribe to the new Source 1063 * instead, passing through its values. 1064 * 1065 * @example 1066 * ```ts 1067 * pipe( 1068 * interval(100), 1069 * map(() => interval(50)), 1070 * switchAll, 1071 * subscribe(x => { 1072 * console.log(text); // logs: 0, 0, 0... 1073 * }) 1074 * ); 1075 * ``` 1076 */ 1077export function switchAll<T>(source: Source<Source<T>>): Source<T> { 1078 return switchMap<Source<T>, T>(identity)(source); 1079} 1080 1081/** Emits `max` values from the Source and then ends. 1082 * 1083 * @param max - The maximum number of values emitted. 1084 * @returns An {@link Operator}. 1085 * 1086 * @remarks 1087 * `take` will issue all values as normal until the `max` number of emitted values has been reached. 1088 * It will then end and close the {@link Source}. 1089 * 1090 * @example 1091 * ```ts 1092 * pipe( 1093 * fromArray([1, 2, 3]), 1094 * take(2), 1095 * subscribe(x => { 1096 * console.log(text); // logs: 1, 2 1097 * }) 1098 * ); 1099 * ``` 1100 */ 1101export function take<T>(max: number): Operator<T, T> { 1102 return source => sink => { 1103 let talkback = talkbackPlaceholder; 1104 let ended = false; 1105 let taken = 0; 1106 source(signal => { 1107 if (ended) { 1108 /*noop*/ 1109 } else if (signal === SignalKind.End) { 1110 ended = true; 1111 sink(SignalKind.End); 1112 } else if (signal.tag === SignalKind.Start) { 1113 if (max <= 0) { 1114 ended = true; 1115 sink(SignalKind.End); 1116 signal[0](TalkbackKind.Close); 1117 } else { 1118 talkback = signal[0]; 1119 } 1120 } else if (taken++ < max) { 1121 sink(signal); 1122 if (!ended && taken >= max) { 1123 ended = true; 1124 sink(SignalKind.End); 1125 talkback(TalkbackKind.Close); 1126 } 1127 } else { 1128 sink(signal); 1129 } 1130 }); 1131 sink( 1132 start(signal => { 1133 if (signal === TalkbackKind.Close && !ended) { 1134 ended = true; 1135 talkback(TalkbackKind.Close); 1136 } else if (signal === TalkbackKind.Pull && !ended && taken < max) { 1137 talkback(TalkbackKind.Pull); 1138 } 1139 }) 1140 ); 1141 }; 1142} 1143 1144/** Buffers the `max` last values of the Source and emits them once the Source ends. 1145 * 1146 * @param max - The maximum number of values buffered. 1147 * @returns An {@link Operator}. 1148 * 1149 * @remarks 1150 * `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It 1151 * will only emit values stored in the buffer once the {@link Source} ends. 1152 * 1153 * All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would 1154 * synchronously. 1155 * 1156 * @example 1157 * ```ts 1158 * pipe( 1159 * fromArray([1, 2, 3]), 1160 * takeLast(1), 1161 * subscribe(x => { 1162 * console.log(text); // logs: 3 1163 * }) 1164 * ); 1165 * ``` 1166 */ 1167export function takeLast<T>(max: number): Operator<T, T> { 1168 return source => sink => { 1169 const queue: T[] = []; 1170 let talkback = talkbackPlaceholder; 1171 source(signal => { 1172 if (signal === SignalKind.End) { 1173 fromArray(queue)(sink); 1174 } else if (signal.tag === SignalKind.Start) { 1175 if (max <= 0) { 1176 signal[0](TalkbackKind.Close); 1177 fromArray(queue)(sink); 1178 } else { 1179 (talkback = signal[0])(TalkbackKind.Pull); 1180 } 1181 } else { 1182 if (queue.length >= max && max) queue.shift(); 1183 queue.push(signal[0]); 1184 talkback(TalkbackKind.Pull); 1185 } 1186 }); 1187 }; 1188} 1189 1190/** Takes values from an input Source until a notifier Source emits a value. 1191 * 1192 * @param notifier - A {@link Source} that stops the operator's sent values. 1193 * @returns An {@link Operator}. 1194 * 1195 * @remarks 1196 * `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier` 1197 * Source emits a value of its own. It'll then close the {@link Source}. 1198 * 1199 * @example 1200 * ```ts 1201 * pipe( 1202 * interval(50), 1203 * takeUntil(interval(150)), 1204 * subscribe(x => { 1205 * console.log(text); // logs: 0, 1 1206 * }) 1207 * ); 1208 * ``` 1209 */ 1210export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> { 1211 return source => sink => { 1212 let sourceTalkback = talkbackPlaceholder; 1213 let notifierTalkback = talkbackPlaceholder; 1214 let ended = false; 1215 source(signal => { 1216 if (ended) { 1217 /*noop*/ 1218 } else if (signal === SignalKind.End) { 1219 ended = true; 1220 notifierTalkback(TalkbackKind.Close); 1221 sink(SignalKind.End); 1222 } else if (signal.tag === SignalKind.Start) { 1223 sourceTalkback = signal[0]; 1224 notifier(signal => { 1225 if (signal === SignalKind.End) { 1226 /*noop*/ 1227 } else if (signal.tag === SignalKind.Start) { 1228 (notifierTalkback = signal[0])(TalkbackKind.Pull); 1229 } else { 1230 ended = true; 1231 notifierTalkback(TalkbackKind.Close); 1232 sourceTalkback(TalkbackKind.Close); 1233 sink(SignalKind.End); 1234 } 1235 }); 1236 } else { 1237 sink(signal); 1238 } 1239 }); 1240 sink( 1241 start(signal => { 1242 if (signal === TalkbackKind.Close && !ended) { 1243 ended = true; 1244 sourceTalkback(TalkbackKind.Close); 1245 notifierTalkback(TalkbackKind.Close); 1246 } else if (!ended) { 1247 sourceTalkback(TalkbackKind.Pull); 1248 } 1249 }) 1250 ); 1251 }; 1252} 1253 1254/** Takes values from an input Source until a predicate function returns `false`. 1255 * 1256 * @param predicate - A function returning a boolean per value. 1257 * @param addOne - Lets an additional input value pass on. 1258 * @returns An {@link Operator}. 1259 * 1260 * @remarks 1261 * `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate` 1262 * function returns `false`. When the `predicate` function returns `false`, the current value is 1263 * omitted and the {@link Source} is closed. 1264 * 1265 * If `addOne` is set to `true`, the value for which the `predicate` first returned `false` is 1266 * issued and passed on as well instead of being omitted. 1267 * 1268 * @example 1269 * ```ts 1270 * pipe( 1271 * fromArray([1, 2, 3]), 1272 * takeWhile(x => x < 2), 1273 * subscribe(x => { 1274 * console.log(text); // logs: 1 1275 * }) 1276 * ); 1277 * ``` 1278 */ 1279export function takeWhile<T>(predicate: (value: T) => boolean, addOne?: boolean): Operator<T, T> { 1280 return source => sink => { 1281 let talkback = talkbackPlaceholder; 1282 let ended = false; 1283 source(signal => { 1284 if (ended) { 1285 /*noop*/ 1286 } else if (signal === SignalKind.End) { 1287 ended = true; 1288 sink(SignalKind.End); 1289 } else if (signal.tag === SignalKind.Start) { 1290 talkback = signal[0]; 1291 sink(signal); 1292 } else if (!predicate(signal[0])) { 1293 ended = true; 1294 if (addOne) sink(signal); 1295 sink(SignalKind.End); 1296 talkback(TalkbackKind.Close); 1297 } else { 1298 sink(signal); 1299 } 1300 }); 1301 }; 1302} 1303 1304/** Debounces a Source by omitting values until a given timeframe has passed. 1305 * 1306 * @param timing - A function returning a debounce time (ms) per emitted value. 1307 * @returns An {@link Operator}. 1308 * 1309 * @remarks 1310 * `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted 1311 * value. All emitted values issued by the {@link Source} during the returned time will be omitted 1312 * until the time has passed. 1313 * 1314 * Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a 1315 * value is let through. 1316 * 1317 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in 1318 * too frequently. 1319 * 1320 * @example 1321 * ```ts 1322 * pipe( 1323 * interval(50), 1324 * debounce(() => 100), 1325 * subscribe(x => { 1326 * console.log(text); // never logs any value 1327 * }) 1328 * ); 1329 * ``` 1330 */ 1331export function debounce<T>(timing: (value: T) => number): Operator<T, T> { 1332 return source => sink => { 1333 let id: any | void; 1334 let deferredEnded = false; 1335 let ended = false; 1336 source(signal => { 1337 if (ended) { 1338 /*noop*/ 1339 } else if (signal === SignalKind.End) { 1340 ended = true; 1341 if (id) { 1342 deferredEnded = true; 1343 } else { 1344 sink(SignalKind.End); 1345 } 1346 } else if (signal.tag === SignalKind.Start) { 1347 const talkback = signal[0]; 1348 sink( 1349 start(signal => { 1350 if (signal === TalkbackKind.Close && !ended) { 1351 ended = true; 1352 deferredEnded = false; 1353 if (id) clearTimeout(id); 1354 talkback(TalkbackKind.Close); 1355 } else if (!ended) { 1356 talkback(TalkbackKind.Pull); 1357 } 1358 }) 1359 ); 1360 } else { 1361 if (id) clearTimeout(id); 1362 id = setTimeout(() => { 1363 id = undefined; 1364 sink(signal); 1365 if (deferredEnded) sink(SignalKind.End); 1366 }, timing(signal[0])); 1367 } 1368 }); 1369 }; 1370} 1371 1372/** Delays each signal emitted by a Source by given time (ms). 1373 * 1374 * @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed. 1375 * @returns An {@link Operator}. 1376 * 1377 * @remarks 1378 * `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by. 1379 * This will create a timeout per received signal and delay the emitted values accordingly. 1380 * 1381 * Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to 1382 * be ordered. Otherwise, signals will arrive in the wrong order at the sink. 1383 */ 1384export function delay<T>(wait: number): Operator<T, T> { 1385 return source => sink => { 1386 let active = 0; 1387 source(signal => { 1388 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 1389 sink(signal); 1390 } else { 1391 active++; 1392 setTimeout(() => { 1393 if (active) { 1394 active--; 1395 sink(signal); 1396 } 1397 }, wait); 1398 } 1399 }); 1400 }; 1401} 1402 1403/** Throttles a Source by omitting values that are emitted before a given timeout. 1404 * 1405 * @param timing - A function returning a throttle time (ms) per emitted value. 1406 * @returns An {@link Operator}. 1407 * 1408 * @remarks 1409 * `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted 1410 * value. During the returned timeframe all values issued by the {@link Source} will be omitted and 1411 * dropped. 1412 * 1413 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in 1414 * too frequently. 1415 * 1416 * @example 1417 * ```ts 1418 * pipe( 1419 * interval(50), 1420 * throttle(() => 100), 1421 * subscribe(x => { 1422 * // omits every second value: 0, 2, 4... 1423 * console.log(text); 1424 * }) 1425 * ); 1426 * ``` 1427 */ 1428export function throttle<T>(timing: (value: T) => number): Operator<T, T> { 1429 return source => sink => { 1430 let skip = false; 1431 let id: any | void; 1432 source(signal => { 1433 if (signal === SignalKind.End) { 1434 if (id) clearTimeout(id); 1435 sink(SignalKind.End); 1436 } else if (signal.tag === SignalKind.Start) { 1437 const talkback = signal[0]; 1438 sink( 1439 start(signal => { 1440 if (signal === TalkbackKind.Close) { 1441 if (id) clearTimeout(id); 1442 talkback(TalkbackKind.Close); 1443 } else { 1444 talkback(TalkbackKind.Pull); 1445 } 1446 }) 1447 ); 1448 } else if (!skip) { 1449 skip = true; 1450 if (id) clearTimeout(id); 1451 id = setTimeout(() => { 1452 id = undefined; 1453 skip = false; 1454 }, timing(signal[0])); 1455 sink(signal); 1456 } 1457 }); 1458 }; 1459} 1460 1461export { mergeAll as flatten, onPush as tap };