Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3import { fromArray } from './sources'; 4 5const identity = <T>(x: T): T => x; 6 7export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> { 8 return source => sink => { 9 let buffer: T[] = []; 10 let sourceTalkback = talkbackPlaceholder; 11 let notifierTalkback = talkbackPlaceholder; 12 let pulled = false; 13 let ended = false; 14 source(signal => { 15 if (ended) { 16 /*noop*/ 17 } else if (signal === SignalKind.End) { 18 ended = true; 19 notifierTalkback(TalkbackKind.Close); 20 if (buffer.length) sink(push(buffer)); 21 sink(SignalKind.End); 22 } else if (signal.tag === SignalKind.Start) { 23 sourceTalkback = signal[0]; 24 notifier(signal => { 25 if (ended) { 26 /*noop*/ 27 } else if (signal === SignalKind.End) { 28 ended = true; 29 sourceTalkback(TalkbackKind.Close); 30 if (buffer.length) sink(push(buffer)); 31 sink(SignalKind.End); 32 } else if (signal.tag === SignalKind.Start) { 33 notifierTalkback = signal[0]; 34 } else if (buffer.length) { 35 const signal = push(buffer); 36 buffer = []; 37 sink(signal); 38 } 39 }); 40 } else { 41 buffer.push(signal[0]); 42 if (!pulled) { 43 pulled = true; 44 sourceTalkback(TalkbackKind.Pull); 45 notifierTalkback(TalkbackKind.Pull); 46 } else { 47 pulled = false; 48 } 49 } 50 }); 51 sink( 52 start(signal => { 53 if (signal === TalkbackKind.Close && !ended) { 54 ended = true; 55 sourceTalkback(TalkbackKind.Close); 56 notifierTalkback(TalkbackKind.Close); 57 } else if (!ended && !pulled) { 58 pulled = true; 59 sourceTalkback(TalkbackKind.Pull); 60 notifierTalkback(TalkbackKind.Pull); 61 } 62 }) 63 ); 64 }; 65} 66 67export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 68 return source => sink => { 69 const inputQueue: In[] = []; 70 let outerTalkback = talkbackPlaceholder; 71 let innerTalkback = talkbackPlaceholder; 72 let outerPulled = false; 73 let innerPulled = false; 74 let innerActive = false; 75 let ended = false; 76 function applyInnerSource(innerSource: Source<Out>): void { 77 innerActive = true; 78 innerSource(signal => { 79 if (signal === SignalKind.End) { 80 if (innerActive) { 81 innerActive = false; 82 if (inputQueue.length) { 83 applyInnerSource(map(inputQueue.shift()!)); 84 } else if (ended) { 85 sink(SignalKind.End); 86 } else if (!outerPulled) { 87 outerPulled = true; 88 outerTalkback(TalkbackKind.Pull); 89 } 90 } 91 } else if (signal.tag === SignalKind.Start) { 92 innerPulled = false; 93 (innerTalkback = signal[0])(TalkbackKind.Pull); 94 } else if (innerActive) { 95 sink(signal); 96 if (innerPulled) { 97 innerPulled = false; 98 } else { 99 innerTalkback(TalkbackKind.Pull); 100 } 101 } 102 }); 103 } 104 source(signal => { 105 if (ended) { 106 /*noop*/ 107 } else if (signal === SignalKind.End) { 108 ended = true; 109 if (!innerActive && !inputQueue.length) sink(SignalKind.End); 110 } else if (signal.tag === SignalKind.Start) { 111 outerTalkback = signal[0]; 112 } else { 113 outerPulled = false; 114 if (innerActive) { 115 inputQueue.push(signal[0]); 116 } else { 117 applyInnerSource(map(signal[0])); 118 } 119 } 120 }); 121 sink( 122 start(signal => { 123 if (signal === TalkbackKind.Close) { 124 if (!ended) { 125 ended = true; 126 outerTalkback(TalkbackKind.Close); 127 } 128 if (innerActive) { 129 innerActive = false; 130 innerTalkback(TalkbackKind.Close); 131 } 132 } else { 133 if (!ended && !outerPulled) { 134 outerPulled = true; 135 outerTalkback(TalkbackKind.Pull); 136 } 137 if (innerActive && !innerPulled) { 138 innerPulled = true; 139 innerTalkback(TalkbackKind.Pull); 140 } 141 } 142 }) 143 ); 144 }; 145} 146 147export function concatAll<T>(source: Source<Source<T>>): Source<T> { 148 return concatMap<Source<T>, T>(identity)(source); 149} 150 151export function concat<T>(sources: Source<T>[]): Source<T> { 152 return concatAll(fromArray(sources)); 153} 154 155export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> { 156 return source => sink => { 157 let talkback = talkbackPlaceholder; 158 source(signal => { 159 if (signal === SignalKind.End) { 160 sink(SignalKind.End); 161 } else if (signal.tag === SignalKind.Start) { 162 talkback = signal[0]; 163 sink(signal); 164 } else if (!predicate(signal[0])) { 165 talkback(TalkbackKind.Pull); 166 } else { 167 sink(signal); 168 } 169 }); 170 }; 171} 172 173export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> { 174 return source => sink => 175 source(signal => { 176 if (signal === SignalKind.End || signal.tag === SignalKind.Start) { 177 sink(signal); 178 } else { 179 sink(push(map(signal[0]))); 180 } 181 }); 182} 183 184export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 185 return source => sink => { 186 let innerTalkbacks: TalkbackFn[] = []; 187 let outerTalkback = talkbackPlaceholder; 188 let outerPulled = false; 189 let ended = false; 190 function applyInnerSource(innerSource: Source<Out>): void { 191 let talkback = talkbackPlaceholder; 192 innerSource(signal => { 193 if (signal === SignalKind.End) { 194 if (innerTalkbacks.length) { 195 const index = innerTalkbacks.indexOf(talkback); 196 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1); 197 if (!innerTalkbacks.length) { 198 if (ended) { 199 sink(SignalKind.End); 200 } else if (!outerPulled) { 201 outerPulled = true; 202 outerTalkback(TalkbackKind.Pull); 203 } 204 } 205 } 206 } else if (signal.tag === SignalKind.Start) { 207 innerTalkbacks.push((talkback = signal[0])); 208 talkback(TalkbackKind.Pull); 209 } else if (innerTalkbacks.length) { 210 sink(signal); 211 talkback(TalkbackKind.Pull); 212 } 213 }); 214 } 215 source(signal => { 216 if (ended) { 217 /*noop*/ 218 } else if (signal === SignalKind.End) { 219 ended = true; 220 if (!innerTalkbacks.length) sink(SignalKind.End); 221 } else if (signal.tag === SignalKind.Start) { 222 outerTalkback = signal[0]; 223 } else { 224 outerPulled = false; 225 applyInnerSource(map(signal[0])); 226 if (!outerPulled) { 227 outerPulled = true; 228 outerTalkback(TalkbackKind.Pull); 229 } 230 } 231 }); 232 sink( 233 start(signal => { 234 if (signal === TalkbackKind.Close) { 235 if (!ended) { 236 ended = true; 237 outerTalkback(TalkbackKind.Close); 238 } 239 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 240 a[i](TalkbackKind.Close); 241 innerTalkbacks.length = 0; 242 } else { 243 if (!ended && !outerPulled) { 244 outerPulled = true; 245 outerTalkback(TalkbackKind.Pull); 246 } else { 247 outerPulled = false; 248 } 249 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 250 a[i](TalkbackKind.Pull); 251 } 252 }) 253 ); 254 }; 255} 256 257export function mergeAll<T>(source: Source<Source<T>>): Source<T> { 258 return mergeMap<Source<T>, T>(identity)(source); 259} 260 261export function merge<T>(sources: Source<T>[]): Source<T> { 262 return mergeAll(fromArray(sources)); 263} 264 265export function onEnd<T>(callback: () => void): Operator<T, T> { 266 return source => sink => { 267 let ended = false; 268 source(signal => { 269 if (ended) { 270 /*noop*/ 271 } else if (signal === SignalKind.End) { 272 ended = true; 273 sink(SignalKind.End); 274 callback(); 275 } else if (signal.tag === SignalKind.Start) { 276 const talkback = signal[0]; 277 sink( 278 start(signal => { 279 if (signal === TalkbackKind.Close) { 280 ended = true; 281 talkback(TalkbackKind.Close); 282 callback(); 283 } else { 284 talkback(signal); 285 } 286 }) 287 ); 288 } else { 289 sink(signal); 290 } 291 }); 292 }; 293} 294 295export function onPush<T>(callback: (value: T) => void): Operator<T, T> { 296 return source => sink => { 297 let ended = false; 298 source(signal => { 299 if (ended) { 300 /*noop*/ 301 } else if (signal === SignalKind.End) { 302 ended = true; 303 sink(SignalKind.End); 304 } else if (signal.tag === SignalKind.Start) { 305 const talkback = signal[0]; 306 sink( 307 start(signal => { 308 if (signal === TalkbackKind.Close) ended = true; 309 talkback(signal); 310 }) 311 ); 312 } else { 313 callback(signal[0]); 314 sink(signal); 315 } 316 }); 317 }; 318} 319 320export function onStart<T>(callback: () => void): Operator<T, T> { 321 return source => sink => 322 source(signal => { 323 if (signal === SignalKind.End) { 324 sink(SignalKind.End); 325 } else if (signal.tag === SignalKind.Start) { 326 sink(signal); 327 callback(); 328 } else { 329 sink(signal); 330 } 331 }); 332} 333 334export function sample<S, T>(notifier: Source<S>): Operator<T, T> { 335 return source => sink => { 336 let sourceTalkback = talkbackPlaceholder; 337 let notifierTalkback = talkbackPlaceholder; 338 let value: T | void; 339 let pulled = false; 340 let ended = false; 341 source(signal => { 342 if (ended) { 343 /*noop*/ 344 } else if (signal === SignalKind.End) { 345 ended = true; 346 notifierTalkback(TalkbackKind.Close); 347 sink(SignalKind.End); 348 } else if (signal.tag === SignalKind.Start) { 349 sourceTalkback = signal[0]; 350 } else { 351 value = signal[0]; 352 if (!pulled) { 353 pulled = true; 354 notifierTalkback(TalkbackKind.Pull); 355 sourceTalkback(TalkbackKind.Pull); 356 } else { 357 pulled = false; 358 } 359 } 360 }); 361 notifier(signal => { 362 if (ended) { 363 /*noop*/ 364 } else if (signal === SignalKind.End) { 365 ended = true; 366 sourceTalkback(TalkbackKind.Close); 367 sink(SignalKind.End); 368 } else if (signal.tag === SignalKind.Start) { 369 notifierTalkback = signal[0]; 370 } else if (value !== undefined) { 371 const signal = push(value); 372 value = undefined; 373 sink(signal); 374 } 375 }); 376 sink( 377 start(signal => { 378 if (signal === TalkbackKind.Close && !ended) { 379 ended = true; 380 sourceTalkback(TalkbackKind.Close); 381 notifierTalkback(TalkbackKind.Close); 382 } else if (!ended && !pulled) { 383 pulled = true; 384 sourceTalkback(TalkbackKind.Pull); 385 notifierTalkback(TalkbackKind.Pull); 386 } 387 }) 388 ); 389 }; 390} 391 392export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> { 393 return source => sink => { 394 let acc = seed; 395 source(signal => { 396 if (signal === SignalKind.End) { 397 sink(SignalKind.End); 398 } else if (signal.tag === SignalKind.Start) { 399 sink(signal); 400 } else { 401 sink(push((acc = reducer(acc, signal[0])))); 402 } 403 }); 404 }; 405} 406 407export function share<T>(source: Source<T>): Source<T> { 408 let sinks: Sink<T>[] = []; 409 let talkback = talkbackPlaceholder; 410 let gotSignal = false; 411 return sink => { 412 sinks.push(sink); 413 if (sinks.length === 1) { 414 source(signal => { 415 if (signal === SignalKind.End) { 416 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End); 417 sinks.length = 0; 418 } else if (signal.tag === SignalKind.Start) { 419 talkback = signal[0]; 420 } else { 421 gotSignal = false; 422 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal); 423 } 424 }); 425 } 426 sink( 427 start(signal => { 428 if (signal === TalkbackKind.Close) { 429 const index = sinks.indexOf(sink); 430 if (index > -1) (sinks = sinks.slice()).splice(index, 1); 431 if (!sinks.length) talkback(TalkbackKind.Close); 432 } else if (!gotSignal) { 433 gotSignal = true; 434 talkback(TalkbackKind.Pull); 435 } 436 }) 437 ); 438 }; 439} 440 441export function skip<T>(wait: number): Operator<T, T> { 442 return source => sink => { 443 let talkback = talkbackPlaceholder; 444 let rest = wait; 445 source(signal => { 446 if (signal === SignalKind.End) { 447 sink(SignalKind.End); 448 } else if (signal.tag === SignalKind.Start) { 449 talkback = signal[0]; 450 sink(signal); 451 } else if (rest-- > 0) { 452 talkback(TalkbackKind.Pull); 453 } else { 454 sink(signal); 455 } 456 }); 457 }; 458} 459 460export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> { 461 return source => sink => { 462 let sourceTalkback = talkbackPlaceholder; 463 let notifierTalkback = talkbackPlaceholder; 464 let skip = true; 465 let pulled = false; 466 let ended = false; 467 source(signal => { 468 if (ended) { 469 /*noop*/ 470 } else if (signal === SignalKind.End) { 471 ended = true; 472 if (skip) notifierTalkback(TalkbackKind.Close); 473 sink(SignalKind.End); 474 } else if (signal.tag === SignalKind.Start) { 475 sourceTalkback = signal[0]; 476 notifier(signal => { 477 if (signal === SignalKind.End) { 478 if (skip) { 479 ended = true; 480 sourceTalkback(TalkbackKind.Close); 481 } 482 } else if (signal.tag === SignalKind.Start) { 483 (notifierTalkback = signal[0])(TalkbackKind.Pull); 484 } else { 485 skip = false; 486 notifierTalkback(TalkbackKind.Close); 487 } 488 }); 489 } else if (!skip) { 490 pulled = false; 491 sink(signal); 492 } else if (!pulled) { 493 pulled = true; 494 sourceTalkback(TalkbackKind.Pull); 495 notifierTalkback(TalkbackKind.Pull); 496 } else { 497 pulled = false; 498 } 499 }); 500 sink( 501 start(signal => { 502 if (signal === TalkbackKind.Close && !ended) { 503 ended = true; 504 sourceTalkback(TalkbackKind.Close); 505 if (skip) notifierTalkback(TalkbackKind.Close); 506 } else if (!ended && !pulled) { 507 pulled = true; 508 if (skip) notifierTalkback(TalkbackKind.Pull); 509 sourceTalkback(TalkbackKind.Pull); 510 } 511 }) 512 ); 513 }; 514} 515 516export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 517 return source => sink => { 518 let talkback = talkbackPlaceholder; 519 let skip = true; 520 source(signal => { 521 if (signal === SignalKind.End) { 522 sink(SignalKind.End); 523 } else if (signal.tag === SignalKind.Start) { 524 talkback = signal[0]; 525 sink(signal); 526 } else if (skip) { 527 if (predicate(signal[0])) { 528 talkback(TalkbackKind.Pull); 529 } else { 530 skip = false; 531 sink(signal); 532 } 533 } else { 534 sink(signal); 535 } 536 }); 537 }; 538} 539 540export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 541 return source => sink => { 542 let outerTalkback = talkbackPlaceholder; 543 let innerTalkback = talkbackPlaceholder; 544 let outerPulled = false; 545 let innerPulled = false; 546 let innerActive = false; 547 let ended = false; 548 function applyInnerSource(innerSource: Source<Out>): void { 549 innerActive = true; 550 innerSource(signal => { 551 if (!innerActive) { 552 /*noop*/ 553 } else if (signal === SignalKind.End) { 554 innerActive = false; 555 if (ended) { 556 sink(SignalKind.End); 557 } else if (!outerPulled) { 558 outerPulled = true; 559 outerTalkback(TalkbackKind.Pull); 560 } 561 } else if (signal.tag === SignalKind.Start) { 562 innerPulled = false; 563 (innerTalkback = signal[0])(TalkbackKind.Pull); 564 } else { 565 sink(signal); 566 if (!innerPulled) { 567 innerTalkback(TalkbackKind.Pull); 568 } else { 569 innerPulled = false; 570 } 571 } 572 }); 573 } 574 source(signal => { 575 if (ended) { 576 /*noop*/ 577 } else if (signal === SignalKind.End) { 578 ended = true; 579 if (!innerActive) sink(SignalKind.End); 580 } else if (signal.tag === SignalKind.Start) { 581 outerTalkback = signal[0]; 582 } else { 583 if (innerActive) { 584 innerTalkback(TalkbackKind.Close); 585 innerTalkback = talkbackPlaceholder; 586 } 587 if (!outerPulled) { 588 outerPulled = true; 589 outerTalkback(TalkbackKind.Pull); 590 } else { 591 outerPulled = false; 592 } 593 applyInnerSource(map(signal[0])); 594 } 595 }); 596 sink( 597 start(signal => { 598 if (signal === TalkbackKind.Close) { 599 if (!ended) { 600 ended = true; 601 outerTalkback(TalkbackKind.Close); 602 } 603 if (innerActive) { 604 innerActive = false; 605 innerTalkback(TalkbackKind.Close); 606 } 607 } else { 608 if (!ended && !outerPulled) { 609 outerPulled = true; 610 outerTalkback(TalkbackKind.Pull); 611 } 612 if (innerActive && !innerPulled) { 613 innerPulled = true; 614 innerTalkback(TalkbackKind.Pull); 615 } 616 } 617 }) 618 ); 619 }; 620} 621 622export function switchAll<T>(source: Source<Source<T>>): Source<T> { 623 return switchMap<Source<T>, T>(identity)(source); 624} 625 626export function take<T>(max: number): Operator<T, T> { 627 return source => sink => { 628 let talkback = talkbackPlaceholder; 629 let ended = false; 630 let taken = 0; 631 source(signal => { 632 if (ended) { 633 /*noop*/ 634 } else if (signal === SignalKind.End) { 635 ended = true; 636 sink(SignalKind.End); 637 } else if (signal.tag === SignalKind.Start) { 638 if (max <= 0) { 639 ended = true; 640 sink(SignalKind.End); 641 signal[0](TalkbackKind.Close); 642 } else { 643 talkback = signal[0]; 644 } 645 } else if (taken++ < max) { 646 sink(signal); 647 if (!ended && taken >= max) { 648 ended = true; 649 sink(SignalKind.End); 650 talkback(TalkbackKind.Close); 651 } 652 } else { 653 sink(signal); 654 } 655 }); 656 sink( 657 start(signal => { 658 if (signal === TalkbackKind.Close && !ended) { 659 ended = true; 660 talkback(TalkbackKind.Close); 661 } else if (signal === TalkbackKind.Pull && !ended && taken < max) { 662 talkback(TalkbackKind.Pull); 663 } 664 }) 665 ); 666 }; 667} 668 669export function takeLast<T>(max: number): Operator<T, T> { 670 return source => sink => { 671 const queue: T[] = []; 672 let talkback = talkbackPlaceholder; 673 source(signal => { 674 if (signal === SignalKind.End) { 675 fromArray(queue)(sink); 676 } else if (signal.tag === SignalKind.Start) { 677 if (max <= 0) { 678 signal[0](TalkbackKind.Close); 679 fromArray(queue)(sink); 680 } else { 681 (talkback = signal[0])(TalkbackKind.Pull); 682 } 683 } else { 684 if (queue.length >= max && max) queue.shift(); 685 queue.push(signal[0]); 686 talkback(TalkbackKind.Pull); 687 } 688 }); 689 }; 690} 691 692export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> { 693 return source => sink => { 694 let sourceTalkback = talkbackPlaceholder; 695 let notifierTalkback = talkbackPlaceholder; 696 let ended = false; 697 source(signal => { 698 if (ended) { 699 /*noop*/ 700 } else if (signal === SignalKind.End) { 701 ended = true; 702 notifierTalkback(TalkbackKind.Close); 703 sink(SignalKind.End); 704 } else if (signal.tag === SignalKind.Start) { 705 sourceTalkback = signal[0]; 706 notifier(signal => { 707 if (signal === SignalKind.End) { 708 /*noop*/ 709 } else if (signal.tag === SignalKind.Start) { 710 (notifierTalkback = signal[0])(TalkbackKind.Pull); 711 } else { 712 ended = true; 713 sourceTalkback(TalkbackKind.Close); 714 sink(SignalKind.End); 715 } 716 }); 717 } else { 718 sink(signal); 719 } 720 }); 721 sink( 722 start(signal => { 723 if (signal === TalkbackKind.Close && !ended) { 724 ended = true; 725 sourceTalkback(TalkbackKind.Close); 726 notifierTalkback(TalkbackKind.Close); 727 } else if (!ended) { 728 sourceTalkback(TalkbackKind.Pull); 729 } 730 }) 731 ); 732 }; 733} 734 735export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 736 return source => sink => { 737 let talkback = talkbackPlaceholder; 738 let ended = false; 739 source(signal => { 740 if (ended) { 741 /*noop*/ 742 } else if (signal === SignalKind.End) { 743 ended = true; 744 sink(SignalKind.End); 745 } else if (signal.tag === SignalKind.Start) { 746 talkback = signal[0]; 747 sink(signal); 748 } else if (!predicate(signal[0])) { 749 ended = true; 750 sink(SignalKind.End); 751 talkback(TalkbackKind.Close); 752 } else { 753 sink(signal); 754 } 755 }); 756 }; 757} 758 759export function debounce<T>(timing: (value: T) => number): Operator<T, T> { 760 return source => sink => { 761 let id: any | void; 762 let deferredEnded = false; 763 let ended = false; 764 source(signal => { 765 if (ended) { 766 /*noop*/ 767 } else if (signal === SignalKind.End) { 768 ended = true; 769 if (id) { 770 deferredEnded = true; 771 } else { 772 sink(SignalKind.End); 773 } 774 } else if (signal.tag === SignalKind.Start) { 775 const talkback = signal[0]; 776 sink( 777 start(signal => { 778 if (signal === TalkbackKind.Close && !ended) { 779 ended = true; 780 deferredEnded = false; 781 if (id) clearTimeout(id); 782 talkback(TalkbackKind.Close); 783 } else if (!ended) { 784 talkback(TalkbackKind.Pull); 785 } 786 }) 787 ); 788 } else { 789 if (id) clearTimeout(id); 790 id = setTimeout(() => { 791 id = undefined; 792 sink(signal); 793 if (deferredEnded) sink(SignalKind.End); 794 }, timing(signal[0])); 795 } 796 }); 797 }; 798} 799 800export function delay<T>(wait: number): Operator<T, T> { 801 return source => sink => { 802 let active = 0; 803 source(signal => { 804 if (typeof signal !== 'number' && signal.tag === SignalKind.Start) { 805 sink(signal); 806 } else { 807 active++; 808 setTimeout(() => { 809 if (active) { 810 active--; 811 sink(signal); 812 } 813 }, wait); 814 } 815 }); 816 }; 817} 818 819export function throttle<T>(timing: (value: T) => number): Operator<T, T> { 820 return source => sink => { 821 let skip = false; 822 let id: any | void; 823 source(signal => { 824 if (signal === SignalKind.End) { 825 if (id) clearTimeout(id); 826 sink(SignalKind.End); 827 } else if (signal.tag === SignalKind.Start) { 828 const talkback = signal[0]; 829 sink( 830 start(signal => { 831 if (signal === TalkbackKind.Close) { 832 if (id) clearTimeout(id); 833 talkback(TalkbackKind.Close); 834 } else { 835 talkback(TalkbackKind.Pull); 836 } 837 }) 838 ); 839 } else if (!skip) { 840 skip = true; 841 if (id) clearTimeout(id); 842 id = setTimeout(() => { 843 id = undefined; 844 skip = false; 845 }, timing(signal[0])); 846 sink(signal); 847 } 848 }); 849 }; 850} 851 852export { mergeAll as flatten, onPush as tap };