Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3import { fromArray } from './sources'; 4 5const identity = <T>(x: T): T => x; 6 7export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> { 8 return source => sink => { 9 let buffer: T[] = []; 10 let sourceTalkback = talkbackPlaceholder; 11 let notifierTalkback = talkbackPlaceholder; 12 let pulled = false; 13 let ended = false; 14 source(signal => { 15 if (ended) { 16 /*noop*/ 17 } else if (signal === SignalKind.End) { 18 ended = true; 19 notifierTalkback(TalkbackKind.Close); 20 if (buffer.length) sink(push(buffer)); 21 sink(SignalKind.End); 22 } else if (signal.tag === SignalKind.Start) { 23 sourceTalkback = signal[0]; 24 notifier(signal => { 25 if (ended) { 26 /*noop*/ 27 } else if (signal === SignalKind.End) { 28 ended = true; 29 sourceTalkback(TalkbackKind.Close); 30 if (buffer.length) sink(push(buffer)); 31 sink(SignalKind.End); 32 } else if (signal.tag === SignalKind.Start) { 33 notifierTalkback = signal[0]; 34 } else if (buffer.length) { 35 const signal = push(buffer); 36 buffer = []; 37 sink(signal); 38 } 39 }); 40 } else { 41 buffer.push(signal[0]); 42 if (!pulled) { 43 pulled = true; 44 sourceTalkback(TalkbackKind.Pull); 45 notifierTalkback(TalkbackKind.Pull); 46 } else { 47 pulled = false; 48 } 49 } 50 }); 51 sink( 52 start(signal => { 53 if (signal === TalkbackKind.Close && !ended) { 54 ended = true; 55 sourceTalkback(TalkbackKind.Close); 56 notifierTalkback(TalkbackKind.Close); 57 } else if (!ended && !pulled) { 58 pulled = true; 59 sourceTalkback(TalkbackKind.Pull); 60 notifierTalkback(TalkbackKind.Pull); 61 } 62 }) 63 ); 64 }; 65} 66 67export function combine<A, B>(sourceA: Source<A>, sourceB: Source<B>): Source<[A, B]> { 68 return sink => { 69 let lastValA: A | void; 70 let lastValB: B | void; 71 let talkbackA = talkbackPlaceholder; 72 let talkbackB = talkbackPlaceholder; 73 let gotSignal = false; 74 let gotEnd = false; 75 let ended = false; 76 sourceA(signal => { 77 if (signal === SignalKind.End) { 78 if (!gotEnd) { 79 gotEnd = true; 80 } else { 81 ended = true; 82 sink(SignalKind.End); 83 } 84 } else if (signal.tag === SignalKind.Start) { 85 talkbackA = signal[0]; 86 } else if (lastValB === undefined) { 87 lastValA = signal[0]; 88 if (!gotSignal) { 89 talkbackB(TalkbackKind.Pull); 90 } else { 91 gotSignal = false; 92 } 93 } else if (!ended) { 94 lastValA = signal[0]; 95 gotSignal = false; 96 sink(push([lastValA, lastValB] as [A, B])); 97 } 98 }); 99 sourceB(signal => { 100 if (signal === SignalKind.End) { 101 if (!gotEnd) { 102 gotEnd = true; 103 } else { 104 ended = true; 105 sink(SignalKind.End); 106 } 107 } else if (signal.tag === SignalKind.Start) { 108 talkbackB = signal[0]; 109 } else if (lastValA === undefined) { 110 lastValB = signal[0]; 111 if (!gotSignal) { 112 talkbackA(TalkbackKind.Pull); 113 } else { 114 gotSignal = false; 115 } 116 } else if (!ended) { 117 lastValB = signal[0]; 118 gotSignal = false; 119 sink(push([lastValA, lastValB] as [A, B])); 120 } 121 }); 122 sink( 123 start(signal => { 124 if (ended) { 125 /*noop*/ 126 } else if (signal === TalkbackKind.Close) { 127 ended = true; 128 talkbackA(TalkbackKind.Close); 129 talkbackB(TalkbackKind.Close); 130 } else if (!gotSignal) { 131 gotSignal = true; 132 talkbackA(TalkbackKind.Pull); 133 talkbackB(TalkbackKind.Pull); 134 } 135 }) 136 ); 137 }; 138} 139 140export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 141 return source => sink => { 142 const inputQueue: In[] = []; 143 let outerTalkback = talkbackPlaceholder; 144 let innerTalkback = talkbackPlaceholder; 145 let outerPulled = false; 146 let innerPulled = false; 147 let innerActive = false; 148 let ended = false; 149 function applyInnerSource(innerSource: Source<Out>): void { 150 innerActive = true; 151 innerSource(signal => { 152 if (signal === SignalKind.End) { 153 if (innerActive) { 154 innerActive = false; 155 if (inputQueue.length) { 156 applyInnerSource(map(inputQueue.shift()!)); 157 } else if (ended) { 158 sink(SignalKind.End); 159 } else if (!outerPulled) { 160 outerPulled = true; 161 outerTalkback(TalkbackKind.Pull); 162 } 163 } 164 } else if (signal.tag === SignalKind.Start) { 165 innerPulled = false; 166 (innerTalkback = signal[0])(TalkbackKind.Pull); 167 } else if (innerActive) { 168 sink(signal); 169 if (innerPulled) { 170 innerPulled = false; 171 } else { 172 innerTalkback(TalkbackKind.Pull); 173 } 174 } 175 }); 176 } 177 source(signal => { 178 if (ended) { 179 /*noop*/ 180 } else if (signal === SignalKind.End) { 181 ended = true; 182 if (!innerActive && !inputQueue.length) sink(SignalKind.End); 183 } else if (signal.tag === SignalKind.Start) { 184 outerTalkback = signal[0]; 185 } else { 186 outerPulled = false; 187 if (innerActive) { 188 inputQueue.push(signal[0]); 189 } else { 190 applyInnerSource(map(signal[0])); 191 } 192 } 193 }); 194 sink( 195 start(signal => { 196 if (signal === TalkbackKind.Close) { 197 if (!ended) { 198 ended = true; 199 outerTalkback(TalkbackKind.Close); 200 } 201 if (innerActive) { 202 innerActive = false; 203 innerTalkback(TalkbackKind.Close); 204 } 205 } else { 206 if (!ended && !outerPulled) { 207 outerPulled = true; 208 outerTalkback(TalkbackKind.Pull); 209 } 210 if (innerActive && !innerPulled) { 211 innerPulled = true; 212 innerTalkback(TalkbackKind.Pull); 213 } 214 } 215 }) 216 ); 217 }; 218} 219 220export function concatAll<T>(source: Source<Source<T>>): Source<T> { 221 return concatMap<Source<T>, T>(identity)(source); 222} 223 224export function concat<T>(sources: Source<T>[]): Source<T> { 225 return concatAll(fromArray(sources)); 226} 227 228export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> { 229 return source => sink => { 230 let talkback = talkbackPlaceholder; 231 source(signal => { 232 if (signal === SignalKind.End) { 233 sink(SignalKind.End); 234 } else if (signal.tag === SignalKind.Start) { 235 talkback = signal[0]; 236 sink(signal); 237 } else if (!predicate(signal[0])) { 238 talkback(TalkbackKind.Pull); 239 } else { 240 sink(signal); 241 } 242 }); 243 }; 244} 245 246export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> { 247 return source => sink => 248 source(signal => { 249 if (signal === SignalKind.End || signal.tag === SignalKind.Start) { 250 sink(signal); 251 } else { 252 sink(push(map(signal[0]))); 253 } 254 }); 255} 256 257export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 258 return source => sink => { 259 let innerTalkbacks: TalkbackFn[] = []; 260 let outerTalkback = talkbackPlaceholder; 261 let outerPulled = false; 262 let ended = false; 263 function applyInnerSource(innerSource: Source<Out>): void { 264 let talkback = talkbackPlaceholder; 265 innerSource(signal => { 266 if (signal === SignalKind.End) { 267 if (innerTalkbacks.length) { 268 const index = innerTalkbacks.indexOf(talkback); 269 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1); 270 if (!innerTalkbacks.length) { 271 if (ended) { 272 sink(SignalKind.End); 273 } else if (!outerPulled) { 274 outerPulled = true; 275 outerTalkback(TalkbackKind.Pull); 276 } 277 } 278 } 279 } else if (signal.tag === SignalKind.Start) { 280 innerTalkbacks.push((talkback = signal[0])); 281 talkback(TalkbackKind.Pull); 282 } else if (innerTalkbacks.length) { 283 sink(signal); 284 talkback(TalkbackKind.Pull); 285 } 286 }); 287 } 288 source(signal => { 289 if (ended) { 290 /*noop*/ 291 } else if (signal === SignalKind.End) { 292 ended = true; 293 if (!innerTalkbacks.length) sink(SignalKind.End); 294 } else if (signal.tag === SignalKind.Start) { 295 outerTalkback = signal[0]; 296 } else { 297 outerPulled = false; 298 applyInnerSource(map(signal[0])); 299 if (!outerPulled) { 300 outerPulled = true; 301 outerTalkback(TalkbackKind.Pull); 302 } 303 } 304 }); 305 sink( 306 start(signal => { 307 if (signal === TalkbackKind.Close) { 308 if (!ended) { 309 ended = true; 310 outerTalkback(TalkbackKind.Close); 311 } 312 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 313 a[i](TalkbackKind.Close); 314 innerTalkbacks.length = 0; 315 } else { 316 if (!ended && !outerPulled) { 317 outerPulled = true; 318 outerTalkback(TalkbackKind.Pull); 319 } else { 320 outerPulled = false; 321 } 322 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) 323 a[i](TalkbackKind.Pull); 324 } 325 }) 326 ); 327 }; 328} 329 330export function mergeAll<T>(source: Source<Source<T>>): Source<T> { 331 return mergeMap<Source<T>, T>(identity)(source); 332} 333 334export function merge<T>(sources: Source<T>[]): Source<T> { 335 return mergeAll(fromArray(sources)); 336} 337 338export function onEnd<T>(callback: () => void): Operator<T, T> { 339 return source => sink => { 340 let ended = false; 341 source(signal => { 342 if (ended) { 343 /*noop*/ 344 } else if (signal === SignalKind.End) { 345 ended = true; 346 sink(SignalKind.End); 347 callback(); 348 } else if (signal.tag === SignalKind.Start) { 349 const talkback = signal[0]; 350 sink( 351 start(signal => { 352 if (signal === TalkbackKind.Close) { 353 ended = true; 354 talkback(TalkbackKind.Close); 355 callback(); 356 } else { 357 talkback(signal); 358 } 359 }) 360 ); 361 } else { 362 sink(signal); 363 } 364 }); 365 }; 366} 367 368export function onPush<T>(callback: (value: T) => void): Operator<T, T> { 369 return source => sink => { 370 let ended = false; 371 source(signal => { 372 if (ended) { 373 /*noop*/ 374 } else if (signal === SignalKind.End) { 375 ended = true; 376 sink(SignalKind.End); 377 } else if (signal.tag === SignalKind.Start) { 378 const talkback = signal[0]; 379 sink( 380 start(signal => { 381 if (signal === TalkbackKind.Close) ended = true; 382 talkback(signal); 383 }) 384 ); 385 } else { 386 callback(signal[0]); 387 sink(signal); 388 } 389 }); 390 }; 391} 392 393export function onStart<T>(callback: () => void): Operator<T, T> { 394 return source => sink => 395 source(signal => { 396 if (signal === SignalKind.End) { 397 sink(SignalKind.End); 398 } else if (signal.tag === SignalKind.Start) { 399 sink(signal); 400 callback(); 401 } else { 402 sink(signal); 403 } 404 }); 405} 406 407export function sample<S, T>(notifier: Source<S>): Operator<T, T> { 408 return source => sink => { 409 let sourceTalkback = talkbackPlaceholder; 410 let notifierTalkback = talkbackPlaceholder; 411 let value: T | void; 412 let pulled = false; 413 let ended = false; 414 source(signal => { 415 if (ended) { 416 /*noop*/ 417 } else if (signal === SignalKind.End) { 418 ended = true; 419 notifierTalkback(TalkbackKind.Close); 420 sink(SignalKind.End); 421 } else if (signal.tag === SignalKind.Start) { 422 sourceTalkback = signal[0]; 423 } else { 424 value = signal[0]; 425 if (!pulled) { 426 pulled = true; 427 notifierTalkback(TalkbackKind.Pull); 428 sourceTalkback(TalkbackKind.Pull); 429 } else { 430 pulled = false; 431 } 432 } 433 }); 434 notifier(signal => { 435 if (ended) { 436 /*noop*/ 437 } else if (signal === SignalKind.End) { 438 ended = true; 439 sourceTalkback(TalkbackKind.Close); 440 sink(SignalKind.End); 441 } else if (signal.tag === SignalKind.Start) { 442 notifierTalkback = signal[0]; 443 } else if (value !== undefined) { 444 const signal = push(value); 445 value = undefined; 446 sink(signal); 447 } 448 }); 449 sink( 450 start(signal => { 451 if (signal === TalkbackKind.Close && !ended) { 452 ended = true; 453 sourceTalkback(TalkbackKind.Close); 454 notifierTalkback(TalkbackKind.Close); 455 } else if (!ended && !pulled) { 456 pulled = true; 457 sourceTalkback(TalkbackKind.Pull); 458 notifierTalkback(TalkbackKind.Pull); 459 } 460 }) 461 ); 462 }; 463} 464 465export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> { 466 return source => sink => { 467 let acc = seed; 468 source(signal => { 469 if (signal === SignalKind.End) { 470 sink(SignalKind.End); 471 } else if (signal.tag === SignalKind.Start) { 472 sink(signal); 473 } else { 474 sink(push((acc = reducer(acc, signal[0])))); 475 } 476 }); 477 }; 478} 479 480export function share<T>(source: Source<T>): Source<T> { 481 let sinks: Sink<T>[] = []; 482 let talkback = talkbackPlaceholder; 483 let gotSignal = false; 484 return sink => { 485 sinks.push(sink); 486 if (sinks.length === 1) { 487 source(signal => { 488 if (signal === SignalKind.End) { 489 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End); 490 sinks.length = 0; 491 } else if (signal.tag === SignalKind.Start) { 492 talkback = signal[0]; 493 } else { 494 gotSignal = false; 495 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal); 496 } 497 }); 498 } 499 sink( 500 start(signal => { 501 if (signal === TalkbackKind.Close) { 502 const index = sinks.indexOf(sink); 503 if (index > -1) (sinks = sinks.slice()).splice(index, 1); 504 if (!sinks.length) talkback(TalkbackKind.Close); 505 } else if (!gotSignal) { 506 gotSignal = true; 507 talkback(TalkbackKind.Pull); 508 } 509 }) 510 ); 511 }; 512} 513 514export function skip<T>(wait: number): Operator<T, T> { 515 return source => sink => { 516 let talkback = talkbackPlaceholder; 517 let rest = wait; 518 source(signal => { 519 if (signal === SignalKind.End) { 520 sink(SignalKind.End); 521 } else if (signal.tag === SignalKind.Start) { 522 talkback = signal[0]; 523 sink(signal); 524 } else if (rest-- > 0) { 525 talkback(TalkbackKind.Pull); 526 } else { 527 sink(signal); 528 } 529 }); 530 }; 531} 532 533export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> { 534 return source => sink => { 535 let sourceTalkback = talkbackPlaceholder; 536 let notifierTalkback = talkbackPlaceholder; 537 let skip = true; 538 let pulled = false; 539 let ended = false; 540 source(signal => { 541 if (ended) { 542 /*noop*/ 543 } else if (signal === SignalKind.End) { 544 ended = true; 545 if (skip) notifierTalkback(TalkbackKind.Close); 546 sink(SignalKind.End); 547 } else if (signal.tag === SignalKind.Start) { 548 sourceTalkback = signal[0]; 549 notifier(signal => { 550 if (signal === SignalKind.End) { 551 if (skip) { 552 ended = true; 553 sourceTalkback(TalkbackKind.Close); 554 } 555 } else if (signal.tag === SignalKind.Start) { 556 (notifierTalkback = signal[0])(TalkbackKind.Pull); 557 } else { 558 skip = false; 559 notifierTalkback(TalkbackKind.Close); 560 } 561 }); 562 } else if (!skip) { 563 pulled = false; 564 sink(signal); 565 } else if (!pulled) { 566 pulled = true; 567 sourceTalkback(TalkbackKind.Pull); 568 notifierTalkback(TalkbackKind.Pull); 569 } else { 570 pulled = false; 571 } 572 }); 573 sink( 574 start(signal => { 575 if (signal === TalkbackKind.Close && !ended) { 576 ended = true; 577 sourceTalkback(TalkbackKind.Close); 578 if (skip) notifierTalkback(TalkbackKind.Close); 579 } else if (!ended && !pulled) { 580 pulled = true; 581 if (skip) notifierTalkback(TalkbackKind.Pull); 582 sourceTalkback(TalkbackKind.Pull); 583 } 584 }) 585 ); 586 }; 587} 588 589export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 590 return source => sink => { 591 let talkback = talkbackPlaceholder; 592 let skip = true; 593 source(signal => { 594 if (signal === SignalKind.End) { 595 sink(SignalKind.End); 596 } else if (signal.tag === SignalKind.Start) { 597 talkback = signal[0]; 598 sink(signal); 599 } else if (skip) { 600 if (predicate(signal[0])) { 601 talkback(TalkbackKind.Pull); 602 } else { 603 skip = false; 604 sink(signal); 605 } 606 } else { 607 sink(signal); 608 } 609 }); 610 }; 611} 612 613export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> { 614 return source => sink => { 615 let outerTalkback = talkbackPlaceholder; 616 let innerTalkback = talkbackPlaceholder; 617 let outerPulled = false; 618 let innerPulled = false; 619 let innerActive = false; 620 let ended = false; 621 function applyInnerSource(innerSource: Source<Out>): void { 622 innerActive = true; 623 innerSource(signal => { 624 if (!innerActive) { 625 /*noop*/ 626 } else if (signal === SignalKind.End) { 627 innerActive = false; 628 if (ended) { 629 sink(SignalKind.End); 630 } else if (!outerPulled) { 631 outerPulled = true; 632 outerTalkback(TalkbackKind.Pull); 633 } 634 } else if (signal.tag === SignalKind.Start) { 635 innerPulled = false; 636 (innerTalkback = signal[0])(TalkbackKind.Pull); 637 } else { 638 sink(signal); 639 if (!innerPulled) { 640 innerTalkback(TalkbackKind.Pull); 641 } else { 642 innerPulled = false; 643 } 644 } 645 }); 646 } 647 source(signal => { 648 if (ended) { 649 /*noop*/ 650 } else if (signal === SignalKind.End) { 651 ended = true; 652 if (!innerActive) sink(SignalKind.End); 653 } else if (signal.tag === SignalKind.Start) { 654 outerTalkback = signal[0]; 655 } else { 656 if (innerActive) { 657 innerTalkback(TalkbackKind.Close); 658 innerTalkback = talkbackPlaceholder; 659 } 660 if (!outerPulled) { 661 outerPulled = true; 662 outerTalkback(TalkbackKind.Pull); 663 } else { 664 outerPulled = false; 665 } 666 applyInnerSource(map(signal[0])); 667 } 668 }); 669 sink( 670 start(signal => { 671 if (signal === TalkbackKind.Close) { 672 if (!ended) { 673 ended = true; 674 outerTalkback(TalkbackKind.Close); 675 } 676 if (innerActive) { 677 innerActive = false; 678 innerTalkback(TalkbackKind.Close); 679 } 680 } else { 681 if (!ended && !outerPulled) { 682 outerPulled = true; 683 outerTalkback(TalkbackKind.Pull); 684 } 685 if (innerActive && !innerPulled) { 686 innerPulled = true; 687 innerTalkback(TalkbackKind.Pull); 688 } 689 } 690 }) 691 ); 692 }; 693} 694 695export function switchAll<T>(source: Source<Source<T>>): Source<T> { 696 return switchMap<Source<T>, T>(identity)(source); 697} 698 699export function take<T>(max: number): Operator<T, T> { 700 return source => sink => { 701 let talkback = talkbackPlaceholder; 702 let ended = false; 703 let taken = 0; 704 source(signal => { 705 if (ended) { 706 /*noop*/ 707 } else if (signal === SignalKind.End) { 708 ended = true; 709 sink(SignalKind.End); 710 } else if (signal.tag === SignalKind.Start) { 711 if (max <= 0) { 712 ended = true; 713 sink(SignalKind.End); 714 signal[0](TalkbackKind.Close); 715 } else { 716 talkback = signal[0]; 717 } 718 } else if (taken++ < max) { 719 sink(signal); 720 if (!ended && taken >= max) { 721 ended = true; 722 sink(SignalKind.End); 723 talkback(TalkbackKind.Close); 724 } 725 } else { 726 sink(signal); 727 } 728 }); 729 sink( 730 start(signal => { 731 if (signal === TalkbackKind.Close && !ended) { 732 ended = true; 733 talkback(TalkbackKind.Close); 734 } else if (signal === TalkbackKind.Pull && !ended && taken < max) { 735 talkback(TalkbackKind.Pull); 736 } 737 }) 738 ); 739 }; 740} 741 742export function takeLast<T>(max: number): Operator<T, T> { 743 return source => sink => { 744 const queue: T[] = []; 745 let talkback = talkbackPlaceholder; 746 source(signal => { 747 if (signal === SignalKind.End) { 748 fromArray(queue)(sink); 749 } else if (signal.tag === SignalKind.Start) { 750 if (max <= 0) { 751 signal[0](TalkbackKind.Close); 752 fromArray(queue)(sink); 753 } else { 754 (talkback = signal[0])(TalkbackKind.Pull); 755 } 756 } else { 757 if (queue.length >= max && max) queue.shift(); 758 queue.push(signal[0]); 759 talkback(TalkbackKind.Pull); 760 } 761 }); 762 }; 763} 764 765export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> { 766 return source => sink => { 767 let sourceTalkback = talkbackPlaceholder; 768 let notifierTalkback = talkbackPlaceholder; 769 let ended = false; 770 source(signal => { 771 if (ended) { 772 /*noop*/ 773 } else if (signal === SignalKind.End) { 774 ended = true; 775 notifierTalkback(TalkbackKind.Close); 776 sink(SignalKind.End); 777 } else if (signal.tag === SignalKind.Start) { 778 sourceTalkback = signal[0]; 779 notifier(signal => { 780 if (signal === SignalKind.End) { 781 /*noop*/ 782 } else if (signal.tag === SignalKind.Start) { 783 (notifierTalkback = signal[0])(TalkbackKind.Pull); 784 } else { 785 ended = true; 786 sourceTalkback(TalkbackKind.Close); 787 sink(SignalKind.End); 788 } 789 }); 790 } else { 791 sink(signal); 792 } 793 }); 794 sink( 795 start(signal => { 796 if (signal === TalkbackKind.Close && !ended) { 797 ended = true; 798 sourceTalkback(TalkbackKind.Close); 799 notifierTalkback(TalkbackKind.Close); 800 } else if (!ended) { 801 sourceTalkback(TalkbackKind.Pull); 802 } 803 }) 804 ); 805 }; 806} 807 808export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> { 809 return source => sink => { 810 let talkback = talkbackPlaceholder; 811 let ended = false; 812 source(signal => { 813 if (ended) { 814 /*noop*/ 815 } else if (signal === SignalKind.End) { 816 ended = true; 817 sink(SignalKind.End); 818 } else if (signal.tag === SignalKind.Start) { 819 talkback = signal[0]; 820 sink(signal); 821 } else if (!predicate(signal[0])) { 822 ended = true; 823 sink(SignalKind.End); 824 talkback(TalkbackKind.Close); 825 } else { 826 sink(signal); 827 } 828 }); 829 }; 830} 831 832export function debounce<T>(timing: (value: T) => number): Operator<T, T> { 833 return source => sink => { 834 let id: any | void; 835 let deferredEnded = false; 836 let ended = false; 837 source(signal => { 838 if (ended) { 839 /*noop*/ 840 } else if (signal === SignalKind.End) { 841 ended = true; 842 if (id) { 843 deferredEnded = true; 844 } else { 845 sink(SignalKind.End); 846 } 847 } else if (signal.tag === SignalKind.Start) { 848 const talkback = signal[0]; 849 sink( 850 start(signal => { 851 if (signal === TalkbackKind.Close && !ended) { 852 ended = true; 853 deferredEnded = false; 854 if (id) clearTimeout(id); 855 talkback(TalkbackKind.Close); 856 } else if (!ended) { 857 talkback(TalkbackKind.Pull); 858 } 859 }) 860 ); 861 } else { 862 if (id) clearTimeout(id); 863 id = setTimeout(() => { 864 id = undefined; 865 sink(signal); 866 if (deferredEnded) sink(SignalKind.End); 867 }, timing(signal[0])); 868 } 869 }); 870 }; 871} 872 873export function delay<T>(wait: number): Operator<T, T> { 874 return source => sink => { 875 let active = 0; 876 source(signal => { 877 if (typeof signal !== 'number' && signal.tag === SignalKind.Start) { 878 sink(signal); 879 } else { 880 active++; 881 setTimeout(() => { 882 if (active) { 883 active--; 884 sink(signal); 885 } 886 }, wait); 887 } 888 }); 889 }; 890} 891 892export function throttle<T>(timing: (value: T) => number): Operator<T, T> { 893 return source => sink => { 894 let skip = false; 895 let id: any | void; 896 source(signal => { 897 if (signal === SignalKind.End) { 898 if (id) clearTimeout(id); 899 sink(SignalKind.End); 900 } else if (signal.tag === SignalKind.Start) { 901 const talkback = signal[0]; 902 sink( 903 start(signal => { 904 if (signal === TalkbackKind.Close) { 905 if (id) clearTimeout(id); 906 talkback(TalkbackKind.Close); 907 } else { 908 talkback(TalkbackKind.Pull); 909 } 910 }) 911 ); 912 } else if (!skip) { 913 skip = true; 914 if (id) clearTimeout(id); 915 id = setTimeout(() => { 916 id = undefined; 917 skip = false; 918 }, timing(signal[0])); 919 sink(signal); 920 } 921 }); 922 }; 923} 924 925export { mergeAll as flatten, onPush as tap };