Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 } 48 ); 49 | Push(value) when !state.ended => 50 Rebel.MutableQueue.add(state.buffer, value); 51 state.pulled = false; 52 state.notifierTalkback(. Pull); 53 | Push(_) => () 54 | End when !state.ended => 55 state.ended = true; 56 state.notifierTalkback(. Close); 57 if (Rebel.MutableQueue.size(state.buffer) > 0) { 58 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 59 }; 60 sink(. End); 61 | End => () 62 } 63 ); 64 65 sink(. 66 Start( 67 (. signal) => 68 if (!state.ended) { 69 switch (signal) { 70 | Close => 71 state.ended = true; 72 state.sourceTalkback(. Close); 73 state.notifierTalkback(. Close); 74 | Pull when !state.pulled => 75 state.pulled = true; 76 state.sourceTalkback(. Pull); 77 state.notifierTalkback(. Pull); 78 | Pull => () 79 }; 80 }, 81 ), 82 ); 83 }) 84 ); 85 86type combineStateT('a, 'b) = { 87 mutable talkbackA: (. talkbackT) => unit, 88 mutable talkbackB: (. talkbackT) => unit, 89 mutable lastValA: option('a), 90 mutable lastValB: option('b), 91 mutable gotSignal: bool, 92 mutable endCounter: int, 93 mutable ended: bool, 94}; 95 96[@genType] 97let combine = 98 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 99 curry(sink => { 100 let state = { 101 talkbackA: talkbackPlaceholder, 102 talkbackB: talkbackPlaceholder, 103 lastValA: None, 104 lastValB: None, 105 gotSignal: false, 106 endCounter: 0, 107 ended: false, 108 }; 109 110 sourceA((. signal) => 111 switch (signal, state.lastValB) { 112 | (Start(tb), _) => state.talkbackA = tb 113 | (Push(a), None) => 114 state.lastValA = Some(a); 115 if (!state.gotSignal) { 116 state.talkbackB(. Pull); 117 } else { 118 state.gotSignal = false; 119 }; 120 | (Push(a), Some(b)) when !state.ended => 121 state.lastValA = Some(a); 122 state.gotSignal = false; 123 sink(. Push((a, b))); 124 | (End, _) when state.endCounter < 1 => 125 state.endCounter = state.endCounter + 1 126 | (End, _) when !state.ended => 127 state.ended = true; 128 sink(. End); 129 | _ => () 130 } 131 ); 132 133 sourceB((. signal) => 134 switch (signal, state.lastValA) { 135 | (Start(tb), _) => state.talkbackB = tb 136 | (Push(b), None) => 137 state.lastValB = Some(b); 138 if (!state.gotSignal) { 139 state.talkbackA(. Pull); 140 } else { 141 state.gotSignal = false; 142 }; 143 | (Push(b), Some(a)) when !state.ended => 144 state.lastValB = Some(b); 145 state.gotSignal = false; 146 sink(. Push((a, b))); 147 | (End, _) when state.endCounter < 1 => 148 state.endCounter = state.endCounter + 1 149 | (End, _) when !state.ended => 150 state.ended = true; 151 sink(. End); 152 | _ => () 153 } 154 ); 155 156 sink(. 157 Start( 158 (. signal) => 159 if (!state.ended) { 160 switch (signal) { 161 | Close => 162 state.ended = true; 163 state.talkbackA(. Close); 164 state.talkbackB(. Close); 165 | Pull when !state.gotSignal => 166 state.gotSignal = true; 167 state.talkbackA(. signal); 168 state.talkbackB(. signal); 169 | Pull => () 170 }; 171 }, 172 ), 173 ); 174 }); 175 176type concatMapStateT('a) = { 177 inputQueue: Rebel.MutableQueue.t('a), 178 mutable outerTalkback: (. talkbackT) => unit, 179 mutable outerPulled: bool, 180 mutable innerTalkback: (. talkbackT) => unit, 181 mutable innerActive: bool, 182 mutable innerPulled: bool, 183 mutable ended: bool, 184}; 185 186[@genType] 187let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 188 curry(source => 189 curry(sink => { 190 let state: concatMapStateT('a) = { 191 inputQueue: Rebel.MutableQueue.make(), 192 outerTalkback: talkbackPlaceholder, 193 outerPulled: false, 194 innerTalkback: talkbackPlaceholder, 195 innerActive: false, 196 innerPulled: false, 197 ended: false, 198 }; 199 200 let rec applyInnerSource = innerSource => 201 innerSource((. signal) => 202 switch (signal) { 203 | Start(tb) => 204 state.innerActive = true; 205 state.innerTalkback = tb; 206 state.innerPulled = false; 207 tb(. Pull); 208 | Push(_) when state.innerActive => 209 sink(. signal); 210 if (!state.innerPulled) { 211 state.innerTalkback(. Pull); 212 } else { 213 state.innerPulled = false; 214 }; 215 | Push(_) => () 216 | End when state.innerActive => 217 state.innerActive = false; 218 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 219 | Some(input) => applyInnerSource(f(. input)) 220 | None when state.ended => sink(. End) 221 | None => () 222 }; 223 | End => () 224 } 225 ); 226 227 source((. signal) => 228 switch (signal) { 229 | Start(tb) => state.outerTalkback = tb 230 | Push(x) when !state.ended => 231 if (!state.outerPulled) { 232 state.outerPulled = true; 233 state.outerTalkback(. Pull); 234 } else { 235 state.outerPulled = false; 236 }; 237 238 if (state.innerActive) { 239 Rebel.MutableQueue.add(state.inputQueue, x); 240 } else { 241 applyInnerSource(f(. x)); 242 }; 243 | Push(_) => () 244 | End when !state.ended => 245 state.ended = true; 246 if (!state.innerActive 247 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 248 sink(. End); 249 }; 250 | End => () 251 } 252 ); 253 254 sink(. 255 Start( 256 (. signal) => 257 switch (signal) { 258 | Pull => 259 if (!state.ended && !state.outerPulled) { 260 state.outerPulled = true; 261 state.outerTalkback(. Pull); 262 }; 263 if (state.innerActive && !state.innerPulled) { 264 state.innerPulled = true; 265 state.innerTalkback(. Pull); 266 }; 267 | Close when !state.ended => 268 state.ended = true; 269 state.innerActive = false; 270 state.innerTalkback(. Close); 271 state.outerTalkback(. Close); 272 | Close => () 273 }, 274 ), 275 ); 276 }) 277 ); 278 279[@genType] 280let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 281 concatMap((. x) => x, source); 282 283[@genType] 284let concat = (sources: array(sourceT('a))): sourceT('a) => 285 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 286 287[@genType] 288let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 289 curry(source => 290 curry(sink => { 291 let talkback = ref(talkbackPlaceholder); 292 293 source((. signal) => 294 switch (signal) { 295 | Start(tb) => 296 talkback := tb; 297 sink(. signal); 298 | Push(x) when !f(. x) => talkback^(. Pull) 299 | _ => sink(. signal) 300 } 301 ); 302 }) 303 ); 304 305[@genType] 306let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 307 curry(source => 308 curry(sink => 309 source((. signal) => 310 sink(. 311 switch (signal) { 312 | Push(x) => Push(f(. x)) 313 | _ => signal 314 }, 315 ) 316 ) 317 ) 318 ); 319 320type mergeMapStateT = { 321 mutable outerTalkback: (. talkbackT) => unit, 322 mutable outerPulled: bool, 323 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 324 mutable ended: bool, 325}; 326 327[@genType] 328let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 329 curry(source => 330 curry(sink => { 331 let state: mergeMapStateT = { 332 outerTalkback: talkbackPlaceholder, 333 outerPulled: false, 334 innerTalkbacks: Rebel.Array.makeEmpty(), 335 ended: false, 336 }; 337 338 let applyInnerSource = innerSource => { 339 let talkback = ref(talkbackPlaceholder); 340 341 innerSource((. signal) => 342 switch (signal) { 343 | Start(tb) => 344 talkback := tb; 345 state.innerTalkbacks = 346 Rebel.Array.append(state.innerTalkbacks, tb); 347 tb(. Pull); 348 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 349 sink(. Push(x)); 350 talkback^(. Pull); 351 | Push(_) => () 352 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 353 state.innerTalkbacks = 354 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 355 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { 356 sink(. End); 357 }; 358 | End => () 359 } 360 ); 361 }; 362 363 source((. signal) => 364 switch (signal) { 365 | Start(tb) => state.outerTalkback = tb 366 | Push(x) when !state.ended => 367 state.outerPulled = false; 368 applyInnerSource(f(. x)); 369 | Push(_) => () 370 | End when !state.ended => 371 state.ended = true; 372 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 373 sink(. End); 374 }; 375 | End => () 376 } 377 ); 378 379 sink(. 380 Start( 381 (. signal) => 382 if (!state.ended) { 383 switch (signal) { 384 | Close => 385 let tbs = state.innerTalkbacks; 386 state.ended = true; 387 state.innerTalkbacks = Rebel.Array.makeEmpty(); 388 state.outerTalkback(. signal); 389 Rebel.Array.forEach(tbs, tb => tb(. signal)); 390 | Pull => 391 if (!state.outerPulled) { 392 state.outerPulled = true; 393 state.outerTalkback(. Pull); 394 }; 395 396 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 397 }; 398 }, 399 ), 400 ); 401 }) 402 ); 403 404[@genType] 405let merge = (sources: array(sourceT('a))): sourceT('a) => 406 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 407 408[@genType] 409let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 410 mergeMap((. x) => x, source); 411 412[@genType] 413let flatten = mergeAll; 414 415[@genType] 416let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 417 curry(source => 418 curry(sink => { 419 source((. signal) => 420 switch (signal) { 421 | Start(talkback) => 422 sink(. 423 Start( 424 (. signal) => { 425 switch (signal) { 426 | Close => f(.) 427 | _ => () 428 }; 429 talkback(. signal); 430 }, 431 ), 432 ) 433 | End => 434 sink(. signal); 435 f(.); 436 | _ => sink(. signal) 437 } 438 ) 439 }) 440 ); 441 442[@genType] 443let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 444 curry(source => 445 curry(sink => 446 source((. signal) => { 447 switch (signal) { 448 | Push(x) => f(. x) 449 | _ => () 450 }; 451 452 sink(. signal); 453 }) 454 ) 455 ); 456 457[@genType] 458let tap = onPush; 459 460[@genType] 461let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 462 curry(source => 463 curry(sink => 464 source((. signal) => 465 switch (signal) { 466 | Start(_) => 467 sink(. signal); 468 f(.); 469 | _ => sink(. signal) 470 } 471 ) 472 ) 473 ); 474 475type sampleStateT('a) = { 476 mutable sourceTalkback: (. talkbackT) => unit, 477 mutable notifierTalkback: (. talkbackT) => unit, 478 mutable value: option('a), 479 mutable pulled: bool, 480 mutable ended: bool, 481}; 482 483[@genType] 484let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 485 curry(source => 486 curry(sink => { 487 let state = { 488 sourceTalkback: talkbackPlaceholder, 489 notifierTalkback: talkbackPlaceholder, 490 value: None, 491 pulled: false, 492 ended: false, 493 }; 494 495 source((. signal) => 496 switch (signal) { 497 | Start(tb) => state.sourceTalkback = tb 498 | Push(x) => 499 state.value = Some(x); 500 state.notifierTalkback(. Pull); 501 | End when !state.ended => 502 state.ended = true; 503 state.notifierTalkback(. Close); 504 sink(. End); 505 | End => () 506 } 507 ); 508 509 notifier((. signal) => 510 switch (signal, state.value) { 511 | (Start(tb), _) => state.notifierTalkback = tb 512 | (End, _) when !state.ended => 513 state.ended = true; 514 state.sourceTalkback(. Close); 515 sink(. End); 516 | (End, _) => () 517 | (Push(_), Some(x)) when !state.ended => 518 state.value = None; 519 sink(. Push(x)); 520 | (Push(_), _) => () 521 } 522 ); 523 524 sink(. 525 Start( 526 (. signal) => 527 if (!state.ended) { 528 switch (signal) { 529 | Pull when !state.pulled => 530 state.pulled = true; 531 state.sourceTalkback(. Pull); 532 state.notifierTalkback(. Pull); 533 | Pull => () 534 | Close => 535 state.ended = true; 536 state.sourceTalkback(. Close); 537 state.notifierTalkback(. Close); 538 }; 539 }, 540 ), 541 ); 542 }) 543 ); 544 545[@genType] 546let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 547 curry(source => 548 curry(sink => { 549 let acc = ref(seed); 550 551 source((. signal) => 552 sink(. 553 switch (signal) { 554 | Push(x) => 555 acc := f(. acc^, x); 556 Push(acc^); 557 | Start(x) => Start(x) 558 | End => End 559 }, 560 ) 561 ); 562 }) 563 ); 564 565type shareStateT('a) = { 566 mutable sinks: Rebel.Array.t(sinkT('a)), 567 mutable talkback: (. talkbackT) => unit, 568 mutable gotSignal: bool, 569}; 570 571[@genType] 572let share = (source: sourceT('a)): sourceT('a) => { 573 let state = { 574 sinks: Rebel.Array.makeEmpty(), 575 talkback: talkbackPlaceholder, 576 gotSignal: false, 577 }; 578 579 sink => { 580 state.sinks = Rebel.Array.append(state.sinks, sink); 581 582 if (Rebel.Array.size(state.sinks) === 1) { 583 source((. signal) => 584 switch (signal) { 585 | Push(_) => 586 state.gotSignal = false; 587 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 588 | Start(x) => state.talkback = x 589 | End => 590 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 591 state.sinks = Rebel.Array.makeEmpty(); 592 } 593 ); 594 }; 595 596 sink(. 597 Start( 598 (. signal) => 599 switch (signal) { 600 | Close => 601 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 602 if (Rebel.Array.size(state.sinks) === 0) { 603 state.talkback(. Close); 604 }; 605 | Pull when !state.gotSignal => 606 state.gotSignal = true; 607 state.talkback(. signal); 608 | Pull => () 609 }, 610 ), 611 ); 612 }; 613}; 614 615type skipStateT = { 616 mutable talkback: (. talkbackT) => unit, 617 mutable rest: int, 618}; 619 620[@genType] 621let skip = (wait: int): operatorT('a, 'a) => 622 curry(source => 623 curry(sink => { 624 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 625 626 source((. signal) => 627 switch (signal) { 628 | Start(tb) => 629 state.talkback = tb; 630 sink(. signal); 631 | Push(_) when state.rest > 0 => 632 state.rest = state.rest - 1; 633 state.talkback(. Pull); 634 | _ => sink(. signal) 635 } 636 ); 637 }) 638 ); 639 640type skipUntilStateT = { 641 mutable sourceTalkback: (. talkbackT) => unit, 642 mutable notifierTalkback: (. talkbackT) => unit, 643 mutable skip: bool, 644 mutable pulled: bool, 645 mutable ended: bool, 646}; 647 648[@genType] 649let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 650 curry(source => 651 curry(sink => { 652 let state: skipUntilStateT = { 653 sourceTalkback: talkbackPlaceholder, 654 notifierTalkback: talkbackPlaceholder, 655 skip: true, 656 pulled: false, 657 ended: false, 658 }; 659 660 source((. signal) => 661 switch (signal) { 662 | Start(tb) => 663 state.sourceTalkback = tb; 664 665 notifier((. signal) => 666 switch (signal) { 667 | Start(innerTb) => 668 state.notifierTalkback = innerTb; 669 innerTb(. Pull); 670 | Push(_) => 671 state.skip = false; 672 state.notifierTalkback(. Close); 673 | End when state.skip => 674 state.ended = true; 675 state.sourceTalkback(. Close); 676 | End => () 677 } 678 ); 679 | Push(_) when !state.skip && !state.ended => 680 state.pulled = false; 681 sink(. signal); 682 | Push(_) => () 683 | End => 684 if (state.skip) { 685 state.notifierTalkback(. Close); 686 }; 687 state.ended = true; 688 sink(. End); 689 } 690 ); 691 692 sink(. 693 Start( 694 (. signal) => 695 if (!state.ended) { 696 switch (signal) { 697 | Close => 698 state.ended = true; 699 state.sourceTalkback(. Close); 700 if (state.skip) { 701 state.notifierTalkback(. Close); 702 }; 703 | Pull when !state.pulled => 704 state.pulled = true; 705 if (state.skip) { 706 state.notifierTalkback(. Pull); 707 }; 708 state.sourceTalkback(. Pull); 709 | Pull => () 710 }; 711 }, 712 ), 713 ); 714 }) 715 ); 716 717type skipWhileStateT = { 718 mutable talkback: (. talkbackT) => unit, 719 mutable skip: bool, 720}; 721 722[@genType] 723let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 724 curry(source => 725 curry(sink => { 726 let state: skipWhileStateT = { 727 talkback: talkbackPlaceholder, 728 skip: true, 729 }; 730 731 source((. signal) => 732 switch (signal) { 733 | Start(tb) => 734 state.talkback = tb; 735 sink(. signal); 736 | Push(x) when state.skip => 737 if (f(. x)) { 738 state.talkback(. Pull); 739 } else { 740 state.skip = false; 741 sink(. signal); 742 } 743 | _ => sink(. signal) 744 } 745 ); 746 }) 747 ); 748 749type switchMapStateT('a) = { 750 mutable outerTalkback: (. talkbackT) => unit, 751 mutable outerPulled: bool, 752 mutable innerTalkback: (. talkbackT) => unit, 753 mutable innerActive: bool, 754 mutable innerPulled: bool, 755 mutable ended: bool, 756}; 757 758[@genType] 759let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 760 curry(source => 761 curry(sink => { 762 let state: switchMapStateT('a) = { 763 outerTalkback: talkbackPlaceholder, 764 outerPulled: false, 765 innerTalkback: talkbackPlaceholder, 766 innerActive: false, 767 innerPulled: false, 768 ended: false, 769 }; 770 771 let applyInnerSource = innerSource => 772 innerSource((. signal) => 773 switch (signal) { 774 | Start(tb) => 775 state.innerActive = true; 776 state.innerTalkback = tb; 777 state.innerPulled = false; 778 tb(. Pull); 779 | Push(_) when state.innerActive => 780 sink(. signal); 781 if (!state.innerPulled) { 782 state.innerTalkback(. Pull); 783 } else { 784 state.innerPulled = false; 785 }; 786 | Push(_) => () 787 | End when state.innerActive => 788 state.innerActive = false; 789 if (state.ended) { 790 sink(. signal); 791 }; 792 | End => () 793 } 794 ); 795 796 source((. signal) => 797 switch (signal) { 798 | Start(tb) => state.outerTalkback = tb 799 | Push(x) when !state.ended => 800 if (state.innerActive) { 801 state.innerTalkback(. Close); 802 state.innerTalkback = talkbackPlaceholder; 803 }; 804 805 if (!state.outerPulled) { 806 state.outerPulled = true; 807 state.outerTalkback(. Pull); 808 } else { 809 state.outerPulled = false; 810 }; 811 812 applyInnerSource(f(. x)); 813 | Push(_) => () 814 | End when !state.ended => 815 state.ended = true; 816 if (!state.innerActive) { 817 sink(. End); 818 }; 819 | End => () 820 } 821 ); 822 823 sink(. 824 Start( 825 (. signal) => 826 switch (signal) { 827 | Pull => 828 if (!state.ended && !state.outerPulled) { 829 state.outerPulled = true; 830 state.outerTalkback(. Pull); 831 }; 832 if (state.innerActive && !state.innerPulled) { 833 state.innerPulled = true; 834 state.innerTalkback(. Pull); 835 }; 836 | Close when !state.ended => 837 state.ended = true; 838 state.innerActive = false; 839 state.innerTalkback(. Close); 840 state.outerTalkback(. Close); 841 | Close => () 842 }, 843 ), 844 ); 845 }) 846 ); 847 848[@genType] 849let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 850 switchMap((. x) => x, source); 851 852type takeStateT = { 853 mutable ended: bool, 854 mutable taken: int, 855 mutable talkback: (. talkbackT) => unit, 856}; 857 858[@genType] 859let take = (max: int): operatorT('a, 'a) => 860 curry(source => 861 curry(sink => { 862 let state: takeStateT = { 863 ended: false, 864 taken: 0, 865 talkback: talkbackPlaceholder, 866 }; 867 868 source((. signal) => 869 switch (signal) { 870 | Start(tb) when max <= 0 => 871 state.ended = true; 872 sink(. End); 873 tb(. Close); 874 | Start(tb) => state.talkback = tb 875 | Push(_) when state.taken < max && !state.ended => 876 state.taken = state.taken + 1; 877 sink(. signal); 878 if (!state.ended && state.taken >= max) { 879 state.ended = true; 880 sink(. End); 881 state.talkback(. Close); 882 }; 883 | Push(_) => () 884 | End when !state.ended => 885 state.ended = true; 886 sink(. End); 887 | End => () 888 } 889 ); 890 891 sink(. 892 Start( 893 (. signal) => 894 if (!state.ended) { 895 switch (signal) { 896 | Pull when state.taken < max => state.talkback(. Pull) 897 | Pull => () 898 | Close => 899 state.ended = true; 900 state.talkback(. Close); 901 }; 902 }, 903 ), 904 ); 905 }) 906 ); 907 908[@genType] 909let takeLast = (max: int): operatorT('a, 'a) => 910 curry(source => 911 curry(sink => { 912 open Rebel; 913 let talkback = ref(talkbackPlaceholder); 914 let queue = MutableQueue.make(); 915 916 source((. signal) => 917 switch (signal) { 918 | Start(tb) when max <= 0 => 919 tb(. Close); 920 Wonka_sources.empty(sink); 921 | Start(tb) => 922 talkback := tb; 923 tb(. Pull); 924 | Push(x) => 925 let size = MutableQueue.size(queue); 926 if (size >= max && max > 0) { 927 ignore(MutableQueue.pop(queue)); 928 }; 929 930 MutableQueue.add(queue, x); 931 talkback^(. Pull); 932 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 933 } 934 ); 935 }) 936 ); 937 938type takeUntilStateT = { 939 mutable ended: bool, 940 mutable sourceTalkback: (. talkbackT) => unit, 941 mutable notifierTalkback: (. talkbackT) => unit, 942}; 943 944[@genType] 945let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 946 curry(source => 947 curry(sink => { 948 let state: takeUntilStateT = { 949 ended: false, 950 sourceTalkback: talkbackPlaceholder, 951 notifierTalkback: talkbackPlaceholder, 952 }; 953 954 source((. signal) => 955 switch (signal) { 956 | Start(tb) => 957 state.sourceTalkback = tb; 958 959 notifier((. signal) => 960 switch (signal) { 961 | Start(innerTb) => 962 state.notifierTalkback = innerTb; 963 innerTb(. Pull); 964 | Push(_) => 965 state.ended = true; 966 state.sourceTalkback(. Close); 967 sink(. End); 968 | End => () 969 } 970 ); 971 | End when !state.ended => 972 state.ended = true; 973 state.notifierTalkback(. Close); 974 sink(. End); 975 | End => () 976 | Push(_) when !state.ended => sink(. signal) 977 | Push(_) => () 978 } 979 ); 980 981 sink(. 982 Start( 983 (. signal) => 984 if (!state.ended) { 985 switch (signal) { 986 | Close => 987 state.ended = true; 988 state.sourceTalkback(. Close); 989 state.notifierTalkback(. Close); 990 | Pull => state.sourceTalkback(. Pull) 991 }; 992 }, 993 ), 994 ); 995 }) 996 ); 997 998type takeWhileStateT = { 999 mutable talkback: (. talkbackT) => unit, 1000 mutable ended: bool, 1001}; 1002 1003[@genType] 1004let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1005 curry(source => 1006 curry(sink => { 1007 let state: takeWhileStateT = { 1008 talkback: talkbackPlaceholder, 1009 ended: false, 1010 }; 1011 1012 source((. signal) => 1013 switch (signal) { 1014 | Start(tb) => 1015 state.talkback = tb; 1016 sink(. signal); 1017 | End when !state.ended => 1018 state.ended = true; 1019 sink(. End); 1020 | End => () 1021 | Push(x) when !state.ended => 1022 if (!f(. x)) { 1023 state.ended = true; 1024 sink(. End); 1025 state.talkback(. Close); 1026 } else { 1027 sink(. signal); 1028 } 1029 | Push(_) => () 1030 } 1031 ); 1032 }) 1033 );