Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 } 48 ); 49 | Push(value) when !state.ended => 50 Rebel.MutableQueue.add(state.buffer, value); 51 state.pulled = false; 52 state.notifierTalkback(. Pull); 53 | Push(_) => () 54 | End when !state.ended => 55 state.ended = true; 56 state.notifierTalkback(. Close); 57 if (Rebel.MutableQueue.size(state.buffer) > 0) { 58 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 59 }; 60 sink(. End); 61 | End => () 62 } 63 ); 64 65 sink(. 66 Start( 67 (. signal) => 68 if (!state.ended) { 69 switch (signal) { 70 | Close => 71 state.ended = true; 72 state.sourceTalkback(. Close); 73 state.notifierTalkback(. Close); 74 | Pull when !state.pulled => 75 state.pulled = true; 76 state.sourceTalkback(. Pull); 77 state.notifierTalkback(. Pull); 78 | Pull => () 79 }; 80 }, 81 ), 82 ); 83 }) 84 ); 85 86type combineStateT('a, 'b) = { 87 mutable talkbackA: (. talkbackT) => unit, 88 mutable talkbackB: (. talkbackT) => unit, 89 mutable lastValA: option('a), 90 mutable lastValB: option('b), 91 mutable gotSignal: bool, 92 mutable endCounter: int, 93 mutable ended: bool, 94}; 95 96[@genType] 97let combine = 98 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 99 curry(sink => { 100 let state = { 101 talkbackA: talkbackPlaceholder, 102 talkbackB: talkbackPlaceholder, 103 lastValA: None, 104 lastValB: None, 105 gotSignal: false, 106 endCounter: 0, 107 ended: false, 108 }; 109 110 sourceA((. signal) => 111 switch (signal, state.lastValB) { 112 | (Start(tb), _) => state.talkbackA = tb 113 | (Push(a), None) => 114 state.lastValA = Some(a); 115 if (!state.gotSignal) { 116 state.talkbackB(. Pull); 117 } else { 118 state.gotSignal = false; 119 }; 120 | (Push(a), Some(b)) when !state.ended => 121 state.lastValA = Some(a); 122 state.gotSignal = false; 123 sink(. Push((a, b))); 124 | (End, _) when state.endCounter < 1 => 125 state.endCounter = state.endCounter + 1 126 | (End, _) when !state.ended => 127 state.ended = true; 128 sink(. End); 129 | _ => () 130 } 131 ); 132 133 sourceB((. signal) => 134 switch (signal, state.lastValA) { 135 | (Start(tb), _) => state.talkbackB = tb 136 | (Push(b), None) => 137 state.lastValB = Some(b); 138 if (!state.gotSignal) { 139 state.talkbackA(. Pull); 140 } else { 141 state.gotSignal = false; 142 }; 143 | (Push(b), Some(a)) when !state.ended => 144 state.lastValB = Some(b); 145 state.gotSignal = false; 146 sink(. Push((a, b))); 147 | (End, _) when state.endCounter < 1 => 148 state.endCounter = state.endCounter + 1 149 | (End, _) when !state.ended => 150 state.ended = true; 151 sink(. End); 152 | _ => () 153 } 154 ); 155 156 sink(. 157 Start( 158 (. signal) => 159 if (!state.ended) { 160 switch (signal) { 161 | Close => 162 state.ended = true; 163 state.talkbackA(. Close); 164 state.talkbackB(. Close); 165 | Pull when !state.gotSignal => 166 state.gotSignal = true; 167 state.talkbackA(. signal); 168 state.talkbackB(. signal); 169 | Pull => () 170 }; 171 }, 172 ), 173 ); 174 }); 175 176type concatMapStateT('a) = { 177 inputQueue: Rebel.MutableQueue.t('a), 178 mutable outerTalkback: (. talkbackT) => unit, 179 mutable outerPulled: bool, 180 mutable innerTalkback: (. talkbackT) => unit, 181 mutable innerActive: bool, 182 mutable innerPulled: bool, 183 mutable ended: bool, 184}; 185 186[@genType] 187let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 188 curry(source => 189 curry(sink => { 190 let state: concatMapStateT('a) = { 191 inputQueue: Rebel.MutableQueue.make(), 192 outerTalkback: talkbackPlaceholder, 193 outerPulled: false, 194 innerTalkback: talkbackPlaceholder, 195 innerActive: false, 196 innerPulled: false, 197 ended: false, 198 }; 199 200 let rec applyInnerSource = innerSource => 201 innerSource((. signal) => 202 switch (signal) { 203 | Start(tb) => 204 state.innerActive = true; 205 state.innerTalkback = tb; 206 state.innerPulled = false; 207 tb(. Pull); 208 | Push(_) when state.innerActive => 209 sink(. signal); 210 if (!state.innerPulled) { 211 state.innerTalkback(. Pull); 212 } else { 213 state.innerPulled = false; 214 }; 215 | Push(_) => () 216 | End when state.innerActive => 217 state.innerActive = false; 218 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 219 | Some(input) => applyInnerSource(f(. input)) 220 | None when state.ended => sink(. End) 221 | None => () 222 }; 223 | End => () 224 } 225 ); 226 227 source((. signal) => 228 switch (signal) { 229 | Start(tb) => state.outerTalkback = tb 230 | Push(x) when !state.ended => 231 if (!state.outerPulled) { 232 state.outerPulled = true; 233 state.outerTalkback(. Pull); 234 } else { 235 state.outerPulled = false; 236 }; 237 238 if (state.innerActive) { 239 Rebel.MutableQueue.add(state.inputQueue, x); 240 } else { 241 applyInnerSource(f(. x)); 242 }; 243 | Push(_) => () 244 | End when !state.ended => 245 state.ended = true; 246 if (!state.innerActive 247 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 248 sink(. End); 249 }; 250 | End => () 251 } 252 ); 253 254 sink(. 255 Start( 256 (. signal) => 257 switch (signal) { 258 | Pull => 259 if (!state.ended && !state.outerPulled) { 260 state.outerPulled = true; 261 state.outerTalkback(. Pull); 262 }; 263 if (state.innerActive && !state.innerPulled) { 264 state.innerPulled = true; 265 state.innerTalkback(. Pull); 266 }; 267 | Close when !state.ended => 268 state.ended = true; 269 state.innerActive = false; 270 state.innerTalkback(. Close); 271 state.outerTalkback(. Close); 272 | Close => () 273 }, 274 ), 275 ); 276 }) 277 ); 278 279[@genType] 280let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 281 concatMap((. x) => x, source); 282 283[@genType] 284let concat = (sources: array(sourceT('a))): sourceT('a) => 285 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 286 287[@genType] 288let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 289 curry(source => 290 curry(sink => { 291 let talkback = ref(talkbackPlaceholder); 292 293 source((. signal) => 294 switch (signal) { 295 | Start(tb) => 296 talkback := tb; 297 sink(. signal); 298 | Push(x) when !f(. x) => talkback^(. Pull) 299 | _ => sink(. signal) 300 } 301 ); 302 }) 303 ); 304 305[@genType] 306let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 307 curry(source => 308 curry(sink => 309 source((. signal) => 310 sink(. 311 /* The signal needs to be recreated for genType to generate 312 the correct generics during codegen */ 313 switch (signal) { 314 | Start(x) => Start(x) 315 | Push(x) => Push(f(. x)) 316 | End => End 317 }, 318 ) 319 ) 320 ) 321 ); 322 323type mergeMapStateT = { 324 mutable outerTalkback: (. talkbackT) => unit, 325 mutable outerPulled: bool, 326 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 327 mutable ended: bool, 328}; 329 330[@genType] 331let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 332 curry(source => 333 curry(sink => { 334 let state: mergeMapStateT = { 335 outerTalkback: talkbackPlaceholder, 336 outerPulled: false, 337 innerTalkbacks: Rebel.Array.makeEmpty(), 338 ended: false, 339 }; 340 341 let applyInnerSource = innerSource => { 342 let talkback = ref(talkbackPlaceholder); 343 344 innerSource((. signal) => 345 switch (signal) { 346 | Start(tb) => 347 talkback := tb; 348 state.innerTalkbacks = 349 Rebel.Array.append(state.innerTalkbacks, tb); 350 tb(. Pull); 351 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 352 sink(. Push(x)); 353 talkback^(. Pull); 354 | Push(_) => () 355 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 356 state.innerTalkbacks = 357 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 358 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { 359 sink(. End); 360 }; 361 | End => () 362 } 363 ); 364 }; 365 366 source((. signal) => 367 switch (signal) { 368 | Start(tb) => state.outerTalkback = tb 369 | Push(x) when !state.ended => 370 state.outerPulled = false; 371 applyInnerSource(f(. x)); 372 | Push(_) => () 373 | End when !state.ended => 374 state.ended = true; 375 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 376 sink(. End); 377 }; 378 | End => () 379 } 380 ); 381 382 sink(. 383 Start( 384 (. signal) => 385 if (!state.ended) { 386 switch (signal) { 387 | Close => 388 let tbs = state.innerTalkbacks; 389 state.ended = true; 390 state.innerTalkbacks = Rebel.Array.makeEmpty(); 391 state.outerTalkback(. signal); 392 Rebel.Array.forEach(tbs, tb => tb(. signal)); 393 | Pull => 394 if (!state.outerPulled) { 395 state.outerPulled = true; 396 state.outerTalkback(. Pull); 397 }; 398 399 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 400 }; 401 }, 402 ), 403 ); 404 }) 405 ); 406 407[@genType] 408let merge = (sources: array(sourceT('a))): sourceT('a) => 409 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 410 411[@genType] 412let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 413 mergeMap((. x) => x, source); 414 415[@genType] 416let flatten = mergeAll; 417 418[@genType] 419let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 420 curry(source => 421 curry(sink => { 422 source((. signal) => 423 switch (signal) { 424 | Start(talkback) => 425 sink(. 426 Start( 427 (. signal) => { 428 switch (signal) { 429 | Close => f(.) 430 | _ => () 431 }; 432 talkback(. signal); 433 }, 434 ), 435 ) 436 | End => 437 sink(. signal); 438 f(.); 439 | _ => sink(. signal) 440 } 441 ) 442 }) 443 ); 444 445[@genType] 446let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 447 curry(source => 448 curry(sink => 449 source((. signal) => { 450 switch (signal) { 451 | Push(x) => f(. x) 452 | _ => () 453 }; 454 455 sink(. signal); 456 }) 457 ) 458 ); 459 460[@genType] 461let tap = onPush; 462 463[@genType] 464let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 465 curry(source => 466 curry(sink => 467 source((. signal) => 468 switch (signal) { 469 | Start(_) => 470 sink(. signal); 471 f(.); 472 | _ => sink(. signal) 473 } 474 ) 475 ) 476 ); 477 478type sampleStateT('a) = { 479 mutable sourceTalkback: (. talkbackT) => unit, 480 mutable notifierTalkback: (. talkbackT) => unit, 481 mutable value: option('a), 482 mutable pulled: bool, 483 mutable ended: bool, 484}; 485 486[@genType] 487let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 488 curry(source => 489 curry(sink => { 490 let state = { 491 sourceTalkback: talkbackPlaceholder, 492 notifierTalkback: talkbackPlaceholder, 493 value: None, 494 pulled: false, 495 ended: false, 496 }; 497 498 source((. signal) => 499 switch (signal) { 500 | Start(tb) => state.sourceTalkback = tb 501 | Push(x) => 502 state.value = Some(x); 503 state.notifierTalkback(. Pull); 504 | End when !state.ended => 505 state.ended = true; 506 state.notifierTalkback(. Close); 507 sink(. End); 508 | End => () 509 } 510 ); 511 512 notifier((. signal) => 513 switch (signal, state.value) { 514 | (Start(tb), _) => state.notifierTalkback = tb 515 | (End, _) when !state.ended => 516 state.ended = true; 517 state.sourceTalkback(. Close); 518 sink(. End); 519 | (End, _) => () 520 | (Push(_), Some(x)) when !state.ended => 521 state.value = None; 522 sink(. Push(x)); 523 | (Push(_), _) => () 524 } 525 ); 526 527 sink(. 528 Start( 529 (. signal) => 530 if (!state.ended) { 531 switch (signal) { 532 | Pull when !state.pulled => 533 state.pulled = true; 534 state.sourceTalkback(. Pull); 535 state.notifierTalkback(. Pull); 536 | Pull => () 537 | Close => 538 state.ended = true; 539 state.sourceTalkback(. Close); 540 state.notifierTalkback(. Close); 541 }; 542 }, 543 ), 544 ); 545 }) 546 ); 547 548[@genType] 549let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 550 curry(source => 551 curry(sink => { 552 let acc = ref(seed); 553 554 source((. signal) => 555 sink(. 556 switch (signal) { 557 | Push(x) => 558 acc := f(. acc^, x); 559 Push(acc^); 560 | Start(x) => Start(x) 561 | End => End 562 }, 563 ) 564 ); 565 }) 566 ); 567 568type shareStateT('a) = { 569 mutable sinks: Rebel.Array.t(sinkT('a)), 570 mutable talkback: (. talkbackT) => unit, 571 mutable gotSignal: bool, 572}; 573 574[@genType] 575let share = (source: sourceT('a)): sourceT('a) => { 576 let state = { 577 sinks: Rebel.Array.makeEmpty(), 578 talkback: talkbackPlaceholder, 579 gotSignal: false, 580 }; 581 582 sink => { 583 state.sinks = Rebel.Array.append(state.sinks, sink); 584 585 if (Rebel.Array.size(state.sinks) === 1) { 586 source((. signal) => 587 switch (signal) { 588 | Push(_) => 589 state.gotSignal = false; 590 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 591 | Start(x) => state.talkback = x 592 | End => 593 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 594 state.sinks = Rebel.Array.makeEmpty(); 595 } 596 ); 597 }; 598 599 sink(. 600 Start( 601 (. signal) => 602 switch (signal) { 603 | Close => 604 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 605 if (Rebel.Array.size(state.sinks) === 0) { 606 state.talkback(. Close); 607 }; 608 | Pull when !state.gotSignal => 609 state.gotSignal = true; 610 state.talkback(. signal); 611 | Pull => () 612 }, 613 ), 614 ); 615 }; 616}; 617 618type skipStateT = { 619 mutable talkback: (. talkbackT) => unit, 620 mutable rest: int, 621}; 622 623[@genType] 624let skip = (wait: int): operatorT('a, 'a) => 625 curry(source => 626 curry(sink => { 627 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 628 629 source((. signal) => 630 switch (signal) { 631 | Start(tb) => 632 state.talkback = tb; 633 sink(. signal); 634 | Push(_) when state.rest > 0 => 635 state.rest = state.rest - 1; 636 state.talkback(. Pull); 637 | _ => sink(. signal) 638 } 639 ); 640 }) 641 ); 642 643type skipUntilStateT = { 644 mutable sourceTalkback: (. talkbackT) => unit, 645 mutable notifierTalkback: (. talkbackT) => unit, 646 mutable skip: bool, 647 mutable pulled: bool, 648 mutable ended: bool, 649}; 650 651[@genType] 652let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 653 curry(source => 654 curry(sink => { 655 let state: skipUntilStateT = { 656 sourceTalkback: talkbackPlaceholder, 657 notifierTalkback: talkbackPlaceholder, 658 skip: true, 659 pulled: false, 660 ended: false, 661 }; 662 663 source((. signal) => 664 switch (signal) { 665 | Start(tb) => 666 state.sourceTalkback = tb; 667 668 notifier((. signal) => 669 switch (signal) { 670 | Start(innerTb) => 671 state.notifierTalkback = innerTb; 672 innerTb(. Pull); 673 | Push(_) => 674 state.skip = false; 675 state.notifierTalkback(. Close); 676 | End when state.skip => 677 state.ended = true; 678 state.sourceTalkback(. Close); 679 | End => () 680 } 681 ); 682 | Push(_) when !state.skip && !state.ended => 683 state.pulled = false; 684 sink(. signal); 685 | Push(_) => () 686 | End => 687 if (state.skip) { 688 state.notifierTalkback(. Close); 689 }; 690 state.ended = true; 691 sink(. End); 692 } 693 ); 694 695 sink(. 696 Start( 697 (. signal) => 698 if (!state.ended) { 699 switch (signal) { 700 | Close => 701 state.ended = true; 702 state.sourceTalkback(. Close); 703 if (state.skip) { 704 state.notifierTalkback(. Close); 705 }; 706 | Pull when !state.pulled => 707 state.pulled = true; 708 if (state.skip) { 709 state.notifierTalkback(. Pull); 710 }; 711 state.sourceTalkback(. Pull); 712 | Pull => () 713 }; 714 }, 715 ), 716 ); 717 }) 718 ); 719 720type skipWhileStateT = { 721 mutable talkback: (. talkbackT) => unit, 722 mutable skip: bool, 723}; 724 725[@genType] 726let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 727 curry(source => 728 curry(sink => { 729 let state: skipWhileStateT = { 730 talkback: talkbackPlaceholder, 731 skip: true, 732 }; 733 734 source((. signal) => 735 switch (signal) { 736 | Start(tb) => 737 state.talkback = tb; 738 sink(. signal); 739 | Push(x) when state.skip => 740 if (f(. x)) { 741 state.talkback(. Pull); 742 } else { 743 state.skip = false; 744 sink(. signal); 745 } 746 | _ => sink(. signal) 747 } 748 ); 749 }) 750 ); 751 752type switchMapStateT('a) = { 753 mutable outerTalkback: (. talkbackT) => unit, 754 mutable outerPulled: bool, 755 mutable innerTalkback: (. talkbackT) => unit, 756 mutable innerActive: bool, 757 mutable innerPulled: bool, 758 mutable ended: bool, 759}; 760 761[@genType] 762let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 763 curry(source => 764 curry(sink => { 765 let state: switchMapStateT('a) = { 766 outerTalkback: talkbackPlaceholder, 767 outerPulled: false, 768 innerTalkback: talkbackPlaceholder, 769 innerActive: false, 770 innerPulled: false, 771 ended: false, 772 }; 773 774 let applyInnerSource = innerSource => 775 innerSource((. signal) => 776 switch (signal) { 777 | Start(tb) => 778 state.innerActive = true; 779 state.innerTalkback = tb; 780 state.innerPulled = false; 781 tb(. Pull); 782 | Push(_) when state.innerActive => 783 sink(. signal); 784 if (!state.innerPulled) { 785 state.innerTalkback(. Pull); 786 } else { 787 state.innerPulled = false; 788 }; 789 | Push(_) => () 790 | End when state.innerActive => 791 state.innerActive = false; 792 if (state.ended) { 793 sink(. signal); 794 }; 795 | End => () 796 } 797 ); 798 799 source((. signal) => 800 switch (signal) { 801 | Start(tb) => state.outerTalkback = tb 802 | Push(x) when !state.ended => 803 if (state.innerActive) { 804 state.innerTalkback(. Close); 805 state.innerTalkback = talkbackPlaceholder; 806 }; 807 808 if (!state.outerPulled) { 809 state.outerPulled = true; 810 state.outerTalkback(. Pull); 811 } else { 812 state.outerPulled = false; 813 }; 814 815 applyInnerSource(f(. x)); 816 | Push(_) => () 817 | End when !state.ended => 818 state.ended = true; 819 if (!state.innerActive) { 820 sink(. End); 821 }; 822 | End => () 823 } 824 ); 825 826 sink(. 827 Start( 828 (. signal) => 829 switch (signal) { 830 | Pull => 831 if (!state.ended && !state.outerPulled) { 832 state.outerPulled = true; 833 state.outerTalkback(. Pull); 834 }; 835 if (state.innerActive && !state.innerPulled) { 836 state.innerPulled = true; 837 state.innerTalkback(. Pull); 838 }; 839 | Close when !state.ended => 840 state.ended = true; 841 state.innerActive = false; 842 state.innerTalkback(. Close); 843 state.outerTalkback(. Close); 844 | Close => () 845 }, 846 ), 847 ); 848 }) 849 ); 850 851[@genType] 852let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 853 switchMap((. x) => x, source); 854 855type takeStateT = { 856 mutable ended: bool, 857 mutable taken: int, 858 mutable talkback: (. talkbackT) => unit, 859}; 860 861[@genType] 862let take = (max: int): operatorT('a, 'a) => 863 curry(source => 864 curry(sink => { 865 let state: takeStateT = { 866 ended: false, 867 taken: 0, 868 talkback: talkbackPlaceholder, 869 }; 870 871 source((. signal) => 872 switch (signal) { 873 | Start(tb) when max <= 0 => 874 state.ended = true; 875 sink(. End); 876 tb(. Close); 877 | Start(tb) => state.talkback = tb 878 | Push(_) when state.taken < max && !state.ended => 879 state.taken = state.taken + 1; 880 sink(. signal); 881 if (!state.ended && state.taken >= max) { 882 state.ended = true; 883 sink(. End); 884 state.talkback(. Close); 885 }; 886 | Push(_) => () 887 | End when !state.ended => 888 state.ended = true; 889 sink(. End); 890 | End => () 891 } 892 ); 893 894 sink(. 895 Start( 896 (. signal) => 897 if (!state.ended) { 898 switch (signal) { 899 | Pull when state.taken < max => state.talkback(. Pull) 900 | Pull => () 901 | Close => 902 state.ended = true; 903 state.talkback(. Close); 904 }; 905 }, 906 ), 907 ); 908 }) 909 ); 910 911[@genType] 912let takeLast = (max: int): operatorT('a, 'a) => 913 curry(source => 914 curry(sink => { 915 open Rebel; 916 let talkback = ref(talkbackPlaceholder); 917 let queue = MutableQueue.make(); 918 919 source((. signal) => 920 switch (signal) { 921 | Start(tb) when max <= 0 => 922 tb(. Close); 923 Wonka_sources.empty(sink); 924 | Start(tb) => 925 talkback := tb; 926 tb(. Pull); 927 | Push(x) => 928 let size = MutableQueue.size(queue); 929 if (size >= max && max > 0) { 930 ignore(MutableQueue.pop(queue)); 931 }; 932 933 MutableQueue.add(queue, x); 934 talkback^(. Pull); 935 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 936 } 937 ); 938 }) 939 ); 940 941type takeUntilStateT = { 942 mutable ended: bool, 943 mutable sourceTalkback: (. talkbackT) => unit, 944 mutable notifierTalkback: (. talkbackT) => unit, 945}; 946 947[@genType] 948let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 949 curry(source => 950 curry(sink => { 951 let state: takeUntilStateT = { 952 ended: false, 953 sourceTalkback: talkbackPlaceholder, 954 notifierTalkback: talkbackPlaceholder, 955 }; 956 957 source((. signal) => 958 switch (signal) { 959 | Start(tb) => 960 state.sourceTalkback = tb; 961 962 notifier((. signal) => 963 switch (signal) { 964 | Start(innerTb) => 965 state.notifierTalkback = innerTb; 966 innerTb(. Pull); 967 | Push(_) => 968 state.ended = true; 969 state.sourceTalkback(. Close); 970 sink(. End); 971 | End => () 972 } 973 ); 974 | End when !state.ended => 975 state.ended = true; 976 state.notifierTalkback(. Close); 977 sink(. End); 978 | End => () 979 | Push(_) when !state.ended => sink(. signal) 980 | Push(_) => () 981 } 982 ); 983 984 sink(. 985 Start( 986 (. signal) => 987 if (!state.ended) { 988 switch (signal) { 989 | Close => 990 state.ended = true; 991 state.sourceTalkback(. Close); 992 state.notifierTalkback(. Close); 993 | Pull => state.sourceTalkback(. Pull) 994 }; 995 }, 996 ), 997 ); 998 }) 999 ); 1000 1001type takeWhileStateT = { 1002 mutable talkback: (. talkbackT) => unit, 1003 mutable ended: bool, 1004}; 1005 1006[@genType] 1007let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1008 curry(source => 1009 curry(sink => { 1010 let state: takeWhileStateT = { 1011 talkback: talkbackPlaceholder, 1012 ended: false, 1013 }; 1014 1015 source((. signal) => 1016 switch (signal) { 1017 | Start(tb) => 1018 state.talkback = tb; 1019 sink(. signal); 1020 | End when !state.ended => 1021 state.ended = true; 1022 sink(. End); 1023 | End => () 1024 | Push(x) when !state.ended => 1025 if (!f(. x)) { 1026 state.ended = true; 1027 sink(. End); 1028 state.talkback(. Close); 1029 } else { 1030 sink(. signal); 1031 } 1032 | Push(_) => () 1033 } 1034 ); 1035 }) 1036 );