Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable ended: bool, 9}; 10 11[@genType] 12let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 13 curry(source => 14 curry(sink => { 15 let state = { 16 buffer: Rebel.MutableQueue.make(), 17 sourceTalkback: talkbackPlaceholder, 18 notifierTalkback: talkbackPlaceholder, 19 ended: false, 20 }; 21 22 source((. signal) => 23 switch (signal) { 24 | Start(tb) => 25 state.sourceTalkback = tb; 26 27 notifier((. signal) => 28 switch (signal) { 29 | Start(tb) => 30 state.notifierTalkback = tb; 31 state.notifierTalkback(. Pull); 32 | Push(_) when !state.ended => 33 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 34 state.buffer = Rebel.MutableQueue.make(); 35 state.notifierTalkback(. Pull); 36 | Push(_) => () 37 | End when !state.ended => 38 state.ended = true; 39 state.sourceTalkback(. Close); 40 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 41 sink(. End); 42 | End => () 43 } 44 ); 45 | Push(value) when !state.ended => 46 Rebel.MutableQueue.add(state.buffer, value); 47 state.sourceTalkback(. Pull); 48 | Push(_) => () 49 | End when !state.ended => 50 state.ended = true; 51 state.notifierTalkback(. Close); 52 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 53 sink(. End); 54 | End => () 55 } 56 ); 57 58 sink(. 59 Start( 60 (. signal) => 61 if (!state.ended) { 62 switch (signal) { 63 | Close => 64 state.ended = true; 65 state.sourceTalkback(. Close); 66 state.notifierTalkback(. Close); 67 | Pull => state.sourceTalkback(. Pull) 68 }; 69 }, 70 ), 71 ); 72 }) 73 ); 74 75type combineStateT('a, 'b) = { 76 mutable talkbackA: (. talkbackT) => unit, 77 mutable talkbackB: (. talkbackT) => unit, 78 mutable lastValA: option('a), 79 mutable lastValB: option('b), 80 mutable gotSignal: bool, 81 mutable endCounter: int, 82 mutable ended: bool, 83}; 84 85[@genType] 86let combine = 87 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 88 curry(sink => { 89 let state = { 90 talkbackA: talkbackPlaceholder, 91 talkbackB: talkbackPlaceholder, 92 lastValA: None, 93 lastValB: None, 94 gotSignal: false, 95 endCounter: 0, 96 ended: false, 97 }; 98 99 sourceA((. signal) => 100 switch (signal, state.lastValB) { 101 | (Start(tb), _) => state.talkbackA = tb 102 | (Push(a), None) => 103 state.lastValA = Some(a); 104 state.gotSignal = false; 105 | (Push(a), Some(b)) when !state.ended => 106 state.lastValA = Some(a); 107 state.gotSignal = false; 108 sink(. Push((a, b))); 109 | (End, _) when state.endCounter < 1 => 110 state.endCounter = state.endCounter + 1 111 | (End, _) when !state.ended => 112 state.ended = true; 113 sink(. End); 114 | _ => () 115 } 116 ); 117 118 sourceB((. signal) => 119 switch (signal, state.lastValA) { 120 | (Start(tb), _) => state.talkbackB = tb 121 | (Push(b), None) => 122 state.lastValB = Some(b); 123 state.gotSignal = false; 124 | (Push(b), Some(a)) when !state.ended => 125 state.lastValB = Some(b); 126 state.gotSignal = false; 127 sink(. Push((a, b))); 128 | (End, _) when state.endCounter < 1 => 129 state.endCounter = state.endCounter + 1 130 | (End, _) when !state.ended => 131 state.ended = true; 132 sink(. End); 133 | _ => () 134 } 135 ); 136 137 sink(. 138 Start( 139 (. signal) => 140 if (!state.ended) { 141 switch (signal) { 142 | Close => 143 state.ended = true; 144 state.talkbackA(. Close); 145 state.talkbackB(. Close); 146 | Pull when !state.gotSignal => 147 state.gotSignal = true; 148 state.talkbackA(. signal); 149 state.talkbackB(. signal); 150 | Pull => () 151 }; 152 }, 153 ), 154 ); 155 }); 156 157type concatMapStateT('a) = { 158 inputQueue: Rebel.MutableQueue.t('a), 159 mutable outerTalkback: (. talkbackT) => unit, 160 mutable innerTalkback: (. talkbackT) => unit, 161 mutable innerActive: bool, 162 mutable closed: bool, 163 mutable ended: bool, 164}; 165 166[@genType] 167let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 168 curry(source => 169 curry(sink => { 170 let state: concatMapStateT('a) = { 171 inputQueue: Rebel.MutableQueue.make(), 172 outerTalkback: talkbackPlaceholder, 173 innerTalkback: talkbackPlaceholder, 174 innerActive: false, 175 closed: false, 176 ended: false, 177 }; 178 179 let rec applyInnerSource = innerSource => 180 innerSource((. signal) => 181 switch (signal) { 182 | End => 183 state.innerActive = false; 184 state.innerTalkback = talkbackPlaceholder; 185 186 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 187 | Some(input) => applyInnerSource(f(. input)) 188 | None when state.ended => sink(. End) 189 | None => () 190 }; 191 | Start(tb) => 192 state.innerActive = true; 193 state.innerTalkback = tb; 194 tb(. Pull); 195 | Push(x) when !state.closed => 196 sink(. Push(x)); 197 state.innerTalkback(. Pull); 198 | Push(_) => () 199 } 200 ); 201 202 source((. signal) => 203 switch (signal) { 204 | End when !state.ended => 205 state.ended = true; 206 if (!state.innerActive 207 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 208 sink(. End); 209 }; 210 | End => () 211 | Start(tb) => 212 state.outerTalkback = tb; 213 tb(. Pull); 214 | Push(x) when !state.ended => 215 if (state.innerActive) { 216 Rebel.MutableQueue.add(state.inputQueue, x); 217 } else { 218 applyInnerSource(f(. x)); 219 }; 220 221 state.outerTalkback(. Pull); 222 | Push(_) => () 223 } 224 ); 225 226 sink(. 227 Start( 228 (. signal) => 229 switch (signal) { 230 | Pull => 231 if (!state.ended) { 232 state.innerTalkback(. Pull); 233 } 234 | Close => 235 state.innerTalkback(. Close); 236 if (!state.ended) { 237 state.ended = true; 238 state.closed = true; 239 state.outerTalkback(. Close); 240 state.innerTalkback = talkbackPlaceholder; 241 }; 242 }, 243 ), 244 ); 245 }) 246 ); 247 248[@genType] 249let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 250 concatMap((. x) => x, source); 251 252[@genType] 253let concat = (sources: array(sourceT('a))): sourceT('a) => 254 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 255 256[@genType] 257let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 258 curry(source => 259 curry(sink => 260 captureTalkback(source, (. signal, talkback) => 261 switch (signal) { 262 | Push(x) when !f(. x) => talkback(. Pull) 263 | _ => sink(. signal) 264 } 265 ) 266 ) 267 ); 268 269[@genType] 270let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 271 curry(source => 272 curry(sink => 273 source((. signal) => 274 sink(. 275 switch (signal) { 276 | Start(x) => Start(x) 277 | Push(x) => Push(f(. x)) 278 | End => End 279 }, 280 ) 281 ) 282 ) 283 ); 284 285type mergeMapStateT = { 286 mutable outerTalkback: (. talkbackT) => unit, 287 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 288 mutable ended: bool, 289}; 290 291[@genType] 292let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 293 curry(source => 294 curry(sink => { 295 let state: mergeMapStateT = { 296 outerTalkback: talkbackPlaceholder, 297 innerTalkbacks: Rebel.Array.makeEmpty(), 298 ended: false, 299 }; 300 301 let applyInnerSource = innerSource => { 302 let talkback = ref(talkbackPlaceholder); 303 304 innerSource((. signal) => 305 switch (signal) { 306 | End => 307 state.innerTalkbacks = 308 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 309 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { 310 sink(. End); 311 }; 312 | Start(tb) => 313 talkback := tb; 314 state.innerTalkbacks = 315 Rebel.Array.append(state.innerTalkbacks, tb); 316 tb(. Pull); 317 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 318 sink(. Push(x)); 319 talkback^(. Pull); 320 | Push(_) => () 321 } 322 ); 323 }; 324 325 source((. signal) => 326 switch (signal) { 327 | End when !state.ended => 328 state.ended = true; 329 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 330 sink(. End); 331 }; 332 | End => () 333 | Start(tb) => 334 state.outerTalkback = tb; 335 tb(. Pull); 336 | Push(x) when !state.ended => 337 applyInnerSource(f(. x)); 338 state.outerTalkback(. Pull); 339 | Push(_) => () 340 } 341 ); 342 343 sink(. 344 Start( 345 (. signal) => 346 switch (signal) { 347 | Close => 348 Rebel.Array.forEach(state.innerTalkbacks, talkback => 349 talkback(. Close) 350 ); 351 if (!state.ended) { 352 state.ended = true; 353 state.outerTalkback(. Close); 354 Rebel.Array.forEach(state.innerTalkbacks, talkback => 355 talkback(. Close) 356 ); 357 state.innerTalkbacks = Rebel.Array.makeEmpty(); 358 }; 359 | Pull when !state.ended => 360 Rebel.Array.forEach(state.innerTalkbacks, talkback => 361 talkback(. Pull) 362 ) 363 | Pull => () 364 }, 365 ), 366 ); 367 }) 368 ); 369 370[@genType] 371let merge = (sources: array(sourceT('a))): sourceT('a) => 372 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 373 374[@genType] 375let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 376 mergeMap((. x) => x, source); 377 378[@genType] 379let flatten = mergeAll; 380 381[@genType] 382let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 383 curry(source => 384 curry(sink => { 385 let ended = ref(false); 386 387 source((. signal) => 388 switch (signal) { 389 | Start(talkback) => 390 sink(. 391 Start( 392 (. signal) => { 393 switch (signal) { 394 | Close when ! ended^ => 395 ended := true; 396 f(.); 397 | Close 398 | Pull => () 399 }; 400 401 talkback(. signal); 402 }, 403 ), 404 ) 405 | End => 406 if (! ended^) { 407 ended := true; 408 sink(. signal); 409 f(.); 410 } 411 | _ => sink(. signal) 412 } 413 ); 414 }) 415 ); 416 417[@genType] 418let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 419 curry(source => 420 curry(sink => 421 source((. signal) => { 422 switch (signal) { 423 | Push(x) => f(. x) 424 | _ => () 425 }; 426 427 sink(. signal); 428 }) 429 ) 430 ); 431 432[@genType] 433let tap = onPush; 434 435[@genType] 436let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 437 curry(source => 438 curry(sink => 439 source((. signal) => 440 switch (signal) { 441 | Start(_) => 442 sink(. signal); 443 f(.); 444 | _ => sink(. signal) 445 } 446 ) 447 ) 448 ); 449 450type sampleStateT('a) = { 451 mutable ended: bool, 452 mutable value: option('a), 453 mutable sourceTalkback: (. talkbackT) => unit, 454 mutable notifierTalkback: (. talkbackT) => unit, 455}; 456 457[@genType] 458let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 459 curry(source => 460 curry(sink => { 461 let state = { 462 ended: false, 463 value: None, 464 sourceTalkback: (. _: talkbackT) => (), 465 notifierTalkback: (. _: talkbackT) => (), 466 }; 467 468 source((. signal) => 469 switch (signal) { 470 | Start(tb) => state.sourceTalkback = tb 471 | End => 472 state.ended = true; 473 state.notifierTalkback(. Close); 474 sink(. End); 475 | Push(x) => state.value = Some(x) 476 } 477 ); 478 479 notifier((. signal) => 480 switch (signal, state.value) { 481 | (Start(tb), _) => state.notifierTalkback = tb 482 | (End, _) => 483 state.ended = true; 484 state.sourceTalkback(. Close); 485 sink(. End); 486 | (Push(_), Some(x)) when !state.ended => 487 state.value = None; 488 sink(. Push(x)); 489 | (Push(_), _) => () 490 } 491 ); 492 493 sink(. 494 Start( 495 (. signal) => 496 switch (signal) { 497 | Pull => 498 state.sourceTalkback(. Pull); 499 state.notifierTalkback(. Pull); 500 | Close => 501 state.ended = true; 502 state.sourceTalkback(. Close); 503 state.notifierTalkback(. Close); 504 }, 505 ), 506 ); 507 }) 508 ); 509 510[@genType] 511let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 512 curry(source => 513 curry(sink => { 514 let acc = ref(seed); 515 516 source((. signal) => 517 sink(. 518 switch (signal) { 519 | Push(x) => 520 acc := f(. acc^, x); 521 Push(acc^); 522 | Start(x) => Start(x) 523 | End => End 524 }, 525 ) 526 ); 527 }) 528 ); 529 530type shareStateT('a) = { 531 mutable sinks: Rebel.Array.t(sinkT('a)), 532 mutable talkback: (. talkbackT) => unit, 533 mutable gotSignal: bool, 534}; 535 536[@genType] 537let share = (source: sourceT('a)): sourceT('a) => { 538 let state = { 539 sinks: Rebel.Array.makeEmpty(), 540 talkback: talkbackPlaceholder, 541 gotSignal: false, 542 }; 543 544 sink => { 545 state.sinks = Rebel.Array.append(state.sinks, sink); 546 547 if (Rebel.Array.size(state.sinks) === 1) { 548 source((. signal) => 549 switch (signal) { 550 | Push(_) => 551 state.gotSignal = false; 552 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 553 | Start(x) => state.talkback = x 554 | End => 555 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 556 state.sinks = Rebel.Array.makeEmpty(); 557 } 558 ); 559 }; 560 561 sink(. 562 Start( 563 (. signal) => 564 switch (signal) { 565 | Close => 566 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 567 if (Rebel.Array.size(state.sinks) === 0) { 568 state.talkback(. Close); 569 }; 570 | Pull when !state.gotSignal => 571 state.gotSignal = true; 572 state.talkback(. signal); 573 | Pull => () 574 }, 575 ), 576 ); 577 }; 578}; 579 580[@genType] 581let skip = (wait: int): operatorT('a, 'a) => 582 curry(source => 583 curry(sink => { 584 let rest = ref(wait); 585 586 captureTalkback(source, (. signal, talkback) => 587 switch (signal) { 588 | Push(_) when rest^ > 0 => 589 rest := rest^ - 1; 590 talkback(. Pull); 591 | _ => sink(. signal) 592 } 593 ); 594 }) 595 ); 596 597type skipUntilStateT = { 598 mutable skip: bool, 599 mutable ended: bool, 600 mutable gotSignal: bool, 601 mutable sourceTalkback: (. talkbackT) => unit, 602 mutable notifierTalkback: (. talkbackT) => unit, 603}; 604 605[@genType] 606let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 607 curry(source => 608 curry(sink => { 609 let state: skipUntilStateT = { 610 skip: true, 611 ended: false, 612 gotSignal: false, 613 sourceTalkback: talkbackPlaceholder, 614 notifierTalkback: talkbackPlaceholder, 615 }; 616 617 source((. signal) => 618 switch (signal) { 619 | Start(tb) => 620 state.sourceTalkback = tb; 621 622 notifier((. signal) => 623 switch (signal) { 624 | Start(innerTb) => 625 state.notifierTalkback = innerTb; 626 innerTb(. Pull); 627 tb(. Pull); 628 | Push(_) => 629 state.skip = false; 630 state.notifierTalkback(. Close); 631 | End => () 632 } 633 ); 634 | Push(_) when state.skip && !state.ended => 635 state.sourceTalkback(. Pull) 636 | Push(_) when !state.ended => 637 state.gotSignal = false; 638 sink(. signal); 639 | Push(_) => () 640 | End => 641 if (state.skip) { 642 state.notifierTalkback(. Close); 643 }; 644 state.ended = true; 645 sink(. End); 646 } 647 ); 648 649 sink(. 650 Start( 651 (. signal) => 652 switch (signal) { 653 | Close => 654 if (state.skip) { 655 state.notifierTalkback(. Close); 656 }; 657 state.ended = true; 658 state.sourceTalkback(. Close); 659 | Pull when !state.gotSignal && !state.ended => 660 state.gotSignal = true; 661 state.sourceTalkback(. Pull); 662 | Pull => () 663 }, 664 ), 665 ); 666 }) 667 ); 668 669[@genType] 670let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 671 curry(source => 672 curry(sink => { 673 let skip = ref(true); 674 675 captureTalkback(source, (. signal, talkback) => 676 switch (signal) { 677 | Push(x) when skip^ => 678 if (f(. x)) { 679 talkback(. Pull); 680 } else { 681 skip := false; 682 sink(. signal); 683 } 684 | _ => sink(. signal) 685 } 686 ); 687 }) 688 ); 689 690type switchMapStateT('a) = { 691 mutable outerTalkback: (. talkbackT) => unit, 692 mutable innerTalkback: (. talkbackT) => unit, 693 mutable innerActive: bool, 694 mutable closed: bool, 695 mutable ended: bool, 696}; 697 698[@genType] 699let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 700 curry(source => 701 curry(sink => { 702 let state: switchMapStateT('a) = { 703 outerTalkback: talkbackPlaceholder, 704 innerTalkback: talkbackPlaceholder, 705 innerActive: false, 706 closed: false, 707 ended: false, 708 }; 709 710 let applyInnerSource = innerSource => 711 innerSource((. signal) => 712 switch (signal) { 713 | End => 714 state.innerActive = false; 715 state.innerTalkback = talkbackPlaceholder; 716 if (state.ended) { 717 sink(. End); 718 }; 719 | Start(tb) => 720 state.innerActive = true; 721 state.innerTalkback = tb; 722 tb(. Pull); 723 | Push(x) when !state.closed => 724 sink(. Push(x)); 725 state.innerTalkback(. Pull); 726 | Push(_) => () 727 } 728 ); 729 730 source((. signal) => 731 switch (signal) { 732 | End when !state.ended => 733 state.ended = true; 734 if (!state.innerActive) { 735 sink(. End); 736 }; 737 | End => () 738 | Start(tb) => 739 state.outerTalkback = tb; 740 tb(. Pull); 741 | Push(x) when !state.ended => 742 if (state.innerActive) { 743 state.innerTalkback(. Close); 744 state.innerTalkback = talkbackPlaceholder; 745 }; 746 applyInnerSource(f(. x)); 747 state.outerTalkback(. Pull); 748 | Push(_) => () 749 } 750 ); 751 752 sink(. 753 Start( 754 (. signal) => 755 switch (signal) { 756 | Pull => state.innerTalkback(. Pull) 757 | Close => 758 state.innerTalkback(. Close); 759 if (!state.ended) { 760 state.ended = true; 761 state.closed = true; 762 state.outerTalkback(. Close); 763 state.innerTalkback = talkbackPlaceholder; 764 }; 765 }, 766 ), 767 ); 768 }) 769 ); 770 771[@genType] 772let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 773 switchMap((. x) => x, source); 774 775type takeStateT = { 776 mutable taken: int, 777 mutable talkback: (. talkbackT) => unit, 778}; 779 780[@genType] 781let take = (max: int): operatorT('a, 'a) => 782 curry(source => 783 curry(sink => { 784 let state: takeStateT = {taken: 0, talkback: talkbackPlaceholder}; 785 786 source((. signal) => 787 switch (signal) { 788 | Start(tb) => state.talkback = tb 789 | Push(_) when state.taken < max => 790 state.taken = state.taken + 1; 791 sink(. signal); 792 793 if (state.taken === max) { 794 sink(. End); 795 state.talkback(. Close); 796 }; 797 | Push(_) => () 798 | End when state.taken < max => 799 state.taken = max; 800 sink(. End); 801 | End => () 802 } 803 ); 804 805 sink(. 806 Start( 807 (. signal) => 808 if (state.taken < max) { 809 switch (signal) { 810 | Pull => state.talkback(. Pull) 811 | Close => 812 state.taken = max; 813 state.talkback(. Close); 814 }; 815 }, 816 ), 817 ); 818 }) 819 ); 820 821[@genType] 822let takeLast = (max: int): operatorT('a, 'a) => 823 curry(source => 824 curry(sink => { 825 open Rebel; 826 let queue = MutableQueue.make(); 827 828 captureTalkback(source, (. signal, talkback) => 829 switch (signal) { 830 | Start(_) => talkback(. Pull) 831 | Push(x) => 832 let size = MutableQueue.size(queue); 833 if (size >= max && max > 0) { 834 ignore(MutableQueue.pop(queue)); 835 }; 836 837 MutableQueue.add(queue, x); 838 talkback(. Pull); 839 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 840 } 841 ); 842 }) 843 ); 844 845type takeUntilStateT = { 846 mutable ended: bool, 847 mutable sourceTalkback: (. talkbackT) => unit, 848 mutable notifierTalkback: (. talkbackT) => unit, 849}; 850 851[@genType] 852let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 853 curry(source => 854 curry(sink => { 855 let state: takeUntilStateT = { 856 ended: false, 857 sourceTalkback: talkbackPlaceholder, 858 notifierTalkback: talkbackPlaceholder, 859 }; 860 861 source((. signal) => 862 switch (signal) { 863 | Start(tb) => 864 state.sourceTalkback = tb; 865 866 notifier((. signal) => 867 switch (signal) { 868 | Start(innerTb) => 869 state.notifierTalkback = innerTb; 870 innerTb(. Pull); 871 | Push(_) => 872 state.ended = true; 873 state.sourceTalkback(. Close); 874 sink(. End); 875 | End => () 876 } 877 ); 878 | End when !state.ended => 879 state.ended = true; 880 state.notifierTalkback(. Close); 881 sink(. End); 882 | End => () 883 | Push(_) when !state.ended => sink(. signal) 884 | Push(_) => () 885 } 886 ); 887 888 sink(. 889 Start( 890 (. signal) => 891 if (!state.ended) { 892 switch (signal) { 893 | Close => 894 state.ended = true; 895 state.sourceTalkback(. Close); 896 state.notifierTalkback(. Close); 897 | Pull => state.sourceTalkback(. Pull) 898 }; 899 }, 900 ), 901 ); 902 }) 903 ); 904 905[@genType] 906let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 907 curry(source => 908 curry(sink => { 909 let ended = ref(false); 910 let talkback = ref(talkbackPlaceholder); 911 912 source((. signal) => 913 switch (signal) { 914 | Start(tb) => 915 talkback := tb; 916 sink(. signal); 917 | End when ! ended^ => 918 ended := true; 919 sink(. End); 920 | End => () 921 | Push(x) when ! ended^ => 922 if (!f(. x)) { 923 ended := true; 924 sink(. End); 925 talkback^(. Close); 926 } else { 927 sink(. signal); 928 } 929 | Push(_) => () 930 } 931 ); 932 933 sink(. 934 Start( 935 (. signal) => 936 if (! ended^) { 937 switch (signal) { 938 | Pull => talkback^(. Pull) 939 | Close => 940 ended := true; 941 talkback^(. Close); 942 }; 943 }, 944 ), 945 ); 946 }) 947 );