Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 } 48 ); 49 | Push(value) when !state.ended => 50 Rebel.MutableQueue.add(state.buffer, value); 51 if (!state.pulled) { 52 state.pulled = true; 53 state.sourceTalkback(. Pull); 54 state.notifierTalkback(. Pull); 55 } else { 56 state.pulled = false; 57 }; 58 | Push(_) => () 59 | End when !state.ended => 60 state.ended = true; 61 state.notifierTalkback(. Close); 62 if (Rebel.MutableQueue.size(state.buffer) > 0) { 63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 64 }; 65 sink(. End); 66 | End => () 67 } 68 ); 69 70 sink(. 71 Start( 72 (. signal) => 73 if (!state.ended) { 74 switch (signal) { 75 | Close => 76 state.ended = true; 77 state.sourceTalkback(. Close); 78 state.notifierTalkback(. Close); 79 | Pull when !state.pulled => 80 state.pulled = true; 81 state.sourceTalkback(. Pull); 82 state.notifierTalkback(. Pull); 83 | Pull => () 84 }; 85 }, 86 ), 87 ); 88 }) 89 ); 90 91type combineStateT('a, 'b) = { 92 mutable talkbackA: (. talkbackT) => unit, 93 mutable talkbackB: (. talkbackT) => unit, 94 mutable lastValA: option('a), 95 mutable lastValB: option('b), 96 mutable gotSignal: bool, 97 mutable endCounter: int, 98 mutable ended: bool, 99}; 100 101[@genType] 102let combine = 103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 104 curry(sink => { 105 let state = { 106 talkbackA: talkbackPlaceholder, 107 talkbackB: talkbackPlaceholder, 108 lastValA: None, 109 lastValB: None, 110 gotSignal: false, 111 endCounter: 0, 112 ended: false, 113 }; 114 115 sourceA((. signal) => 116 switch (signal, state.lastValB) { 117 | (Start(tb), _) => state.talkbackA = tb 118 | (Push(a), None) => 119 state.lastValA = Some(a); 120 if (!state.gotSignal) { 121 state.talkbackB(. Pull); 122 } else { 123 state.gotSignal = false; 124 }; 125 | (Push(a), Some(b)) when !state.ended => 126 state.lastValA = Some(a); 127 state.gotSignal = false; 128 sink(. Push((a, b))); 129 | (End, _) when state.endCounter < 1 => 130 state.endCounter = state.endCounter + 1 131 | (End, _) when !state.ended => 132 state.ended = true; 133 sink(. End); 134 | _ => () 135 } 136 ); 137 138 sourceB((. signal) => 139 switch (signal, state.lastValA) { 140 | (Start(tb), _) => state.talkbackB = tb 141 | (Push(b), None) => 142 state.lastValB = Some(b); 143 if (!state.gotSignal) { 144 state.talkbackA(. Pull); 145 } else { 146 state.gotSignal = false; 147 }; 148 | (Push(b), Some(a)) when !state.ended => 149 state.lastValB = Some(b); 150 state.gotSignal = false; 151 sink(. Push((a, b))); 152 | (End, _) when state.endCounter < 1 => 153 state.endCounter = state.endCounter + 1 154 | (End, _) when !state.ended => 155 state.ended = true; 156 sink(. End); 157 | _ => () 158 } 159 ); 160 161 sink(. 162 Start( 163 (. signal) => 164 if (!state.ended) { 165 switch (signal) { 166 | Close => 167 state.ended = true; 168 state.talkbackA(. Close); 169 state.talkbackB(. Close); 170 | Pull when !state.gotSignal => 171 state.gotSignal = true; 172 state.talkbackA(. signal); 173 state.talkbackB(. signal); 174 | Pull => () 175 }; 176 }, 177 ), 178 ); 179 }); 180 181type concatMapStateT('a) = { 182 inputQueue: Rebel.MutableQueue.t('a), 183 mutable outerTalkback: (. talkbackT) => unit, 184 mutable outerPulled: bool, 185 mutable innerTalkback: (. talkbackT) => unit, 186 mutable innerActive: bool, 187 mutable innerPulled: bool, 188 mutable ended: bool, 189}; 190 191[@genType] 192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 193 curry(source => 194 curry(sink => { 195 let state: concatMapStateT('a) = { 196 inputQueue: Rebel.MutableQueue.make(), 197 outerTalkback: talkbackPlaceholder, 198 outerPulled: false, 199 innerTalkback: talkbackPlaceholder, 200 innerActive: false, 201 innerPulled: false, 202 ended: false, 203 }; 204 205 let rec applyInnerSource = innerSource => 206 innerSource((. signal) => 207 switch (signal) { 208 | Start(tb) => 209 state.innerActive = true; 210 state.innerTalkback = tb; 211 state.innerPulled = false; 212 tb(. Pull); 213 | Push(_) when state.innerActive => 214 sink(. signal); 215 if (!state.innerPulled) { 216 state.innerTalkback(. Pull); 217 } else { 218 state.innerPulled = false; 219 }; 220 | Push(_) => () 221 | End when state.innerActive => 222 state.innerActive = false; 223 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 224 | Some(input) => applyInnerSource(f(. input)) 225 | None when state.ended => sink(. End) 226 | None when !state.outerPulled => 227 state.outerPulled = true; 228 state.outerTalkback(. Pull); 229 | None => () 230 }; 231 | End => () 232 } 233 ); 234 235 source((. signal) => 236 switch (signal) { 237 | Start(tb) => state.outerTalkback = tb 238 | Push(x) when !state.ended => 239 state.outerPulled = false; 240 if (state.innerActive) { 241 Rebel.MutableQueue.add(state.inputQueue, x); 242 } else { 243 applyInnerSource(f(. x)); 244 }; 245 | Push(_) => () 246 | End when !state.ended => 247 state.ended = true; 248 if (!state.innerActive 249 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 250 sink(. End); 251 }; 252 | End => () 253 } 254 ); 255 256 sink(. 257 Start( 258 (. signal) => 259 switch (signal) { 260 | Pull => 261 if (!state.ended && !state.outerPulled) { 262 state.outerPulled = true; 263 state.outerTalkback(. Pull); 264 }; 265 if (state.innerActive && !state.innerPulled) { 266 state.innerPulled = true; 267 state.innerTalkback(. Pull); 268 }; 269 | Close => 270 if (!state.ended) { 271 state.ended = true; 272 state.outerTalkback(. Close); 273 }; 274 if (state.innerActive) { 275 state.innerActive = false; 276 state.innerTalkback(. Close); 277 }; 278 }, 279 ), 280 ); 281 }) 282 ); 283 284[@genType] 285let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 286 concatMap((. x) => x, source); 287 288[@genType] 289let concat = (sources: array(sourceT('a))): sourceT('a) => 290 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 291 292[@genType] 293let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 294 curry(source => 295 curry(sink => { 296 let talkback = ref(talkbackPlaceholder); 297 298 source((. signal) => 299 switch (signal) { 300 | Start(tb) => 301 talkback := tb; 302 sink(. signal); 303 | Push(x) when !f(. x) => talkback^(. Pull) 304 | _ => sink(. signal) 305 } 306 ); 307 }) 308 ); 309 310[@genType] 311let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 312 curry(source => 313 curry(sink => 314 source((. signal) => 315 sink(. 316 /* The signal needs to be recreated for genType to generate 317 the correct generics during codegen */ 318 switch (signal) { 319 | Start(x) => Start(x) 320 | Push(x) => Push(f(. x)) 321 | End => End 322 }, 323 ) 324 ) 325 ) 326 ); 327 328type mergeMapStateT = { 329 mutable outerTalkback: (. talkbackT) => unit, 330 mutable outerPulled: bool, 331 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 332 mutable ended: bool, 333}; 334 335[@genType] 336let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 337 curry(source => 338 curry(sink => { 339 let state: mergeMapStateT = { 340 outerTalkback: talkbackPlaceholder, 341 outerPulled: false, 342 innerTalkbacks: Rebel.Array.makeEmpty(), 343 ended: false, 344 }; 345 346 let applyInnerSource = innerSource => { 347 let talkback = ref(talkbackPlaceholder); 348 349 innerSource((. signal) => 350 switch (signal) { 351 | Start(tb) => 352 talkback := tb; 353 state.innerTalkbacks = 354 Rebel.Array.append(state.innerTalkbacks, tb); 355 tb(. Pull); 356 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 357 sink(. Push(x)); 358 talkback^(. Pull); 359 | Push(_) => () 360 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 361 state.innerTalkbacks = 362 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 363 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; 364 if (state.ended && exhausted) { 365 sink(. End); 366 } else if (!state.outerPulled && exhausted) { 367 state.outerPulled = true; 368 state.outerTalkback(. Pull); 369 }; 370 | End => () 371 } 372 ); 373 }; 374 375 source((. signal) => 376 switch (signal) { 377 | Start(tb) => state.outerTalkback = tb 378 | Push(x) when !state.ended => 379 state.outerPulled = false; 380 applyInnerSource(f(. x)); 381 if (!state.outerPulled) { 382 state.outerPulled = true; 383 state.outerTalkback(. Pull); 384 }; 385 | Push(_) => () 386 | End when !state.ended => 387 state.ended = true; 388 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 389 sink(. End); 390 }; 391 | End => () 392 } 393 ); 394 395 sink(. 396 Start( 397 (. signal) => 398 switch (signal) { 399 | Close => 400 if (!state.ended) { 401 state.ended = true; 402 state.outerTalkback(. signal); 403 }; 404 405 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); 406 state.innerTalkbacks = Rebel.Array.makeEmpty(); 407 | Pull => 408 if (!state.outerPulled && !state.ended) { 409 state.outerPulled = true; 410 state.outerTalkback(. Pull); 411 } else { 412 state.outerPulled = false; 413 }; 414 415 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 416 }, 417 ), 418 ); 419 }) 420 ); 421 422[@genType] 423let merge = (sources: array(sourceT('a))): sourceT('a) => 424 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 425 426[@genType] 427let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 428 mergeMap((. x) => x, source); 429 430[@genType] 431let flatten = mergeAll; 432 433[@genType] 434let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 435 curry(source => 436 curry(sink => { 437 source((. signal) => 438 switch (signal) { 439 | Start(talkback) => 440 sink(. 441 Start( 442 (. signal) => { 443 switch (signal) { 444 | Close => f(.) 445 | _ => () 446 }; 447 talkback(. signal); 448 }, 449 ), 450 ) 451 | End => 452 sink(. signal); 453 f(.); 454 | _ => sink(. signal) 455 } 456 ) 457 }) 458 ); 459 460[@genType] 461let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 462 curry(source => 463 curry(sink => 464 source((. signal) => { 465 switch (signal) { 466 | Push(x) => f(. x) 467 | _ => () 468 }; 469 470 sink(. signal); 471 }) 472 ) 473 ); 474 475[@genType] 476let tap = onPush; 477 478[@genType] 479let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 480 curry(source => 481 curry(sink => 482 source((. signal) => 483 switch (signal) { 484 | Start(_) => 485 sink(. signal); 486 f(.); 487 | _ => sink(. signal) 488 } 489 ) 490 ) 491 ); 492 493type sampleStateT('a) = { 494 mutable sourceTalkback: (. talkbackT) => unit, 495 mutable notifierTalkback: (. talkbackT) => unit, 496 mutable value: option('a), 497 mutable pulled: bool, 498 mutable ended: bool, 499}; 500 501[@genType] 502let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 503 curry(source => 504 curry(sink => { 505 let state = { 506 sourceTalkback: talkbackPlaceholder, 507 notifierTalkback: talkbackPlaceholder, 508 value: None, 509 pulled: false, 510 ended: false, 511 }; 512 513 source((. signal) => 514 switch (signal) { 515 | Start(tb) => state.sourceTalkback = tb 516 | Push(x) => 517 state.value = Some(x); 518 if (!state.pulled) { 519 state.pulled = true; 520 state.notifierTalkback(. Pull); 521 state.sourceTalkback(. Pull); 522 } else { 523 state.pulled = false; 524 }; 525 | End when !state.ended => 526 state.ended = true; 527 state.notifierTalkback(. Close); 528 sink(. End); 529 | End => () 530 } 531 ); 532 533 notifier((. signal) => 534 switch (signal, state.value) { 535 | (Start(tb), _) => state.notifierTalkback = tb 536 | (End, _) when !state.ended => 537 state.ended = true; 538 state.sourceTalkback(. Close); 539 sink(. End); 540 | (End, _) => () 541 | (Push(_), Some(x)) when !state.ended => 542 state.value = None; 543 sink(. Push(x)); 544 | (Push(_), _) => () 545 } 546 ); 547 548 sink(. 549 Start( 550 (. signal) => 551 if (!state.ended) { 552 switch (signal) { 553 | Pull when !state.pulled => 554 state.pulled = true; 555 state.sourceTalkback(. Pull); 556 state.notifierTalkback(. Pull); 557 | Pull => () 558 | Close => 559 state.ended = true; 560 state.sourceTalkback(. Close); 561 state.notifierTalkback(. Close); 562 }; 563 }, 564 ), 565 ); 566 }) 567 ); 568 569[@genType] 570let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 571 curry(source => 572 curry(sink => { 573 let acc = ref(seed); 574 575 source((. signal) => 576 sink(. 577 switch (signal) { 578 | Push(x) => 579 acc := f(. acc^, x); 580 Push(acc^); 581 | Start(x) => Start(x) 582 | End => End 583 }, 584 ) 585 ); 586 }) 587 ); 588 589type shareStateT('a) = { 590 mutable sinks: Rebel.Array.t(sinkT('a)), 591 mutable talkback: (. talkbackT) => unit, 592 mutable gotSignal: bool, 593}; 594 595[@genType] 596let share = (source: sourceT('a)): sourceT('a) => { 597 let state = { 598 sinks: Rebel.Array.makeEmpty(), 599 talkback: talkbackPlaceholder, 600 gotSignal: false, 601 }; 602 603 sink => { 604 state.sinks = Rebel.Array.append(state.sinks, sink); 605 606 if (Rebel.Array.size(state.sinks) === 1) { 607 source((. signal) => 608 switch (signal) { 609 | Push(_) => 610 state.gotSignal = false; 611 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 612 | Start(x) => state.talkback = x 613 | End => 614 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 615 state.sinks = Rebel.Array.makeEmpty(); 616 } 617 ); 618 }; 619 620 sink(. 621 Start( 622 (. signal) => 623 switch (signal) { 624 | Close => 625 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 626 if (Rebel.Array.size(state.sinks) === 0) { 627 state.talkback(. Close); 628 }; 629 | Pull when !state.gotSignal => 630 state.gotSignal = true; 631 state.talkback(. signal); 632 | Pull => () 633 }, 634 ), 635 ); 636 }; 637}; 638 639type skipStateT = { 640 mutable talkback: (. talkbackT) => unit, 641 mutable rest: int, 642}; 643 644[@genType] 645let skip = (wait: int): operatorT('a, 'a) => 646 curry(source => 647 curry(sink => { 648 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 649 650 source((. signal) => 651 switch (signal) { 652 | Start(tb) => 653 state.talkback = tb; 654 sink(. signal); 655 | Push(_) when state.rest > 0 => 656 state.rest = state.rest - 1; 657 state.talkback(. Pull); 658 | _ => sink(. signal) 659 } 660 ); 661 }) 662 ); 663 664type skipUntilStateT = { 665 mutable sourceTalkback: (. talkbackT) => unit, 666 mutable notifierTalkback: (. talkbackT) => unit, 667 mutable skip: bool, 668 mutable pulled: bool, 669 mutable ended: bool, 670}; 671 672[@genType] 673let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 674 curry(source => 675 curry(sink => { 676 let state: skipUntilStateT = { 677 sourceTalkback: talkbackPlaceholder, 678 notifierTalkback: talkbackPlaceholder, 679 skip: true, 680 pulled: false, 681 ended: false, 682 }; 683 684 source((. signal) => 685 switch (signal) { 686 | Start(tb) => 687 state.sourceTalkback = tb; 688 689 notifier((. signal) => 690 switch (signal) { 691 | Start(innerTb) => 692 state.notifierTalkback = innerTb; 693 innerTb(. Pull); 694 | Push(_) => 695 state.skip = false; 696 state.notifierTalkback(. Close); 697 | End when state.skip => 698 state.ended = true; 699 state.sourceTalkback(. Close); 700 | End => () 701 } 702 ); 703 | Push(_) when !state.skip && !state.ended => 704 state.pulled = false; 705 sink(. signal); 706 | Push(_) when !state.pulled => 707 state.pulled = true; 708 state.sourceTalkback(. Pull); 709 state.notifierTalkback(. Pull); 710 | Push(_) => state.pulled = false 711 | End => 712 if (state.skip) { 713 state.notifierTalkback(. Close); 714 }; 715 state.ended = true; 716 sink(. End); 717 } 718 ); 719 720 sink(. 721 Start( 722 (. signal) => 723 if (!state.ended) { 724 switch (signal) { 725 | Close => 726 state.ended = true; 727 state.sourceTalkback(. Close); 728 if (state.skip) { 729 state.notifierTalkback(. Close); 730 }; 731 | Pull when !state.pulled => 732 state.pulled = true; 733 if (state.skip) { 734 state.notifierTalkback(. Pull); 735 }; 736 state.sourceTalkback(. Pull); 737 | Pull => () 738 }; 739 }, 740 ), 741 ); 742 }) 743 ); 744 745type skipWhileStateT = { 746 mutable talkback: (. talkbackT) => unit, 747 mutable skip: bool, 748}; 749 750[@genType] 751let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 752 curry(source => 753 curry(sink => { 754 let state: skipWhileStateT = { 755 talkback: talkbackPlaceholder, 756 skip: true, 757 }; 758 759 source((. signal) => 760 switch (signal) { 761 | Start(tb) => 762 state.talkback = tb; 763 sink(. signal); 764 | Push(x) when state.skip => 765 if (f(. x)) { 766 state.talkback(. Pull); 767 } else { 768 state.skip = false; 769 sink(. signal); 770 } 771 | _ => sink(. signal) 772 } 773 ); 774 }) 775 ); 776 777type switchMapStateT('a) = { 778 mutable outerTalkback: (. talkbackT) => unit, 779 mutable outerPulled: bool, 780 mutable innerTalkback: (. talkbackT) => unit, 781 mutable innerActive: bool, 782 mutable innerPulled: bool, 783 mutable ended: bool, 784}; 785 786[@genType] 787let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 788 curry(source => 789 curry(sink => { 790 let state: switchMapStateT('a) = { 791 outerTalkback: talkbackPlaceholder, 792 outerPulled: false, 793 innerTalkback: talkbackPlaceholder, 794 innerActive: false, 795 innerPulled: false, 796 ended: false, 797 }; 798 799 let applyInnerSource = innerSource => 800 innerSource((. signal) => 801 switch (signal) { 802 | Start(tb) => 803 state.innerActive = true; 804 state.innerTalkback = tb; 805 state.innerPulled = false; 806 tb(. Pull); 807 | Push(_) when state.innerActive => 808 sink(. signal); 809 if (!state.innerPulled) { 810 state.innerTalkback(. Pull); 811 } else { 812 state.innerPulled = false; 813 }; 814 | Push(_) => () 815 | End when state.innerActive => 816 state.innerActive = false; 817 if (state.ended) { 818 sink(. signal); 819 } else if (!state.outerPulled) { 820 state.outerPulled = true; 821 state.outerTalkback(. Pull); 822 }; 823 | End => () 824 } 825 ); 826 827 source((. signal) => 828 switch (signal) { 829 | Start(tb) => state.outerTalkback = tb 830 | Push(x) when !state.ended => 831 if (state.innerActive) { 832 state.innerTalkback(. Close); 833 state.innerTalkback = talkbackPlaceholder; 834 }; 835 836 if (!state.outerPulled) { 837 state.outerPulled = true; 838 state.outerTalkback(. Pull); 839 } else { 840 state.outerPulled = false; 841 }; 842 843 applyInnerSource(f(. x)); 844 | Push(_) => () 845 | End when !state.ended => 846 state.ended = true; 847 if (!state.innerActive) { 848 sink(. End); 849 }; 850 | End => () 851 } 852 ); 853 854 sink(. 855 Start( 856 (. signal) => 857 switch (signal) { 858 | Pull => 859 if (!state.ended && !state.outerPulled) { 860 state.outerPulled = true; 861 state.outerTalkback(. Pull); 862 }; 863 if (state.innerActive && !state.innerPulled) { 864 state.innerPulled = true; 865 state.innerTalkback(. Pull); 866 }; 867 | Close => 868 if (!state.ended) { 869 state.ended = true; 870 state.outerTalkback(. Close); 871 }; 872 if (state.innerActive) { 873 state.innerActive = false; 874 state.innerTalkback(. Close); 875 }; 876 }, 877 ), 878 ); 879 }) 880 ); 881 882[@genType] 883let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 884 switchMap((. x) => x, source); 885 886type takeStateT = { 887 mutable ended: bool, 888 mutable taken: int, 889 mutable talkback: (. talkbackT) => unit, 890}; 891 892[@genType] 893let take = (max: int): operatorT('a, 'a) => 894 curry(source => 895 curry(sink => { 896 let state: takeStateT = { 897 ended: false, 898 taken: 0, 899 talkback: talkbackPlaceholder, 900 }; 901 902 source((. signal) => 903 switch (signal) { 904 | Start(tb) when max <= 0 => 905 state.ended = true; 906 sink(. End); 907 tb(. Close); 908 | Start(tb) => state.talkback = tb 909 | Push(_) when state.taken < max && !state.ended => 910 state.taken = state.taken + 1; 911 sink(. signal); 912 if (!state.ended && state.taken >= max) { 913 state.ended = true; 914 sink(. End); 915 state.talkback(. Close); 916 }; 917 | Push(_) => () 918 | End when !state.ended => 919 state.ended = true; 920 sink(. End); 921 | End => () 922 } 923 ); 924 925 sink(. 926 Start( 927 (. signal) => 928 if (!state.ended) { 929 switch (signal) { 930 | Pull when state.taken < max => state.talkback(. Pull) 931 | Pull => () 932 | Close => 933 state.ended = true; 934 state.talkback(. Close); 935 }; 936 }, 937 ), 938 ); 939 }) 940 ); 941 942[@genType] 943let takeLast = (max: int): operatorT('a, 'a) => 944 curry(source => 945 curry(sink => { 946 open Rebel; 947 let talkback = ref(talkbackPlaceholder); 948 let queue = MutableQueue.make(); 949 950 source((. signal) => 951 switch (signal) { 952 | Start(tb) when max <= 0 => 953 tb(. Close); 954 Wonka_sources.empty(sink); 955 | Start(tb) => 956 talkback := tb; 957 tb(. Pull); 958 | Push(x) => 959 let size = MutableQueue.size(queue); 960 if (size >= max && max > 0) { 961 ignore(MutableQueue.pop(queue)); 962 }; 963 964 MutableQueue.add(queue, x); 965 talkback^(. Pull); 966 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 967 } 968 ); 969 }) 970 ); 971 972type takeUntilStateT = { 973 mutable ended: bool, 974 mutable sourceTalkback: (. talkbackT) => unit, 975 mutable notifierTalkback: (. talkbackT) => unit, 976}; 977 978[@genType] 979let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 980 curry(source => 981 curry(sink => { 982 let state: takeUntilStateT = { 983 ended: false, 984 sourceTalkback: talkbackPlaceholder, 985 notifierTalkback: talkbackPlaceholder, 986 }; 987 988 source((. signal) => 989 switch (signal) { 990 | Start(tb) => 991 state.sourceTalkback = tb; 992 993 notifier((. signal) => 994 switch (signal) { 995 | Start(innerTb) => 996 state.notifierTalkback = innerTb; 997 innerTb(. Pull); 998 | Push(_) => 999 state.ended = true; 1000 state.sourceTalkback(. Close); 1001 sink(. End); 1002 | End => () 1003 } 1004 ); 1005 | End when !state.ended => 1006 state.ended = true; 1007 state.notifierTalkback(. Close); 1008 sink(. End); 1009 | End => () 1010 | Push(_) when !state.ended => sink(. signal) 1011 | Push(_) => () 1012 } 1013 ); 1014 1015 sink(. 1016 Start( 1017 (. signal) => 1018 if (!state.ended) { 1019 switch (signal) { 1020 | Close => 1021 state.ended = true; 1022 state.sourceTalkback(. Close); 1023 state.notifierTalkback(. Close); 1024 | Pull => state.sourceTalkback(. Pull) 1025 }; 1026 }, 1027 ), 1028 ); 1029 }) 1030 ); 1031 1032type takeWhileStateT = { 1033 mutable talkback: (. talkbackT) => unit, 1034 mutable ended: bool, 1035}; 1036 1037[@genType] 1038let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1039 curry(source => 1040 curry(sink => { 1041 let state: takeWhileStateT = { 1042 talkback: talkbackPlaceholder, 1043 ended: false, 1044 }; 1045 1046 source((. signal) => 1047 switch (signal) { 1048 | Start(tb) => 1049 state.talkback = tb; 1050 sink(. signal); 1051 | End when !state.ended => 1052 state.ended = true; 1053 sink(. End); 1054 | End => () 1055 | Push(x) when !state.ended => 1056 if (!f(. x)) { 1057 state.ended = true; 1058 sink(. End); 1059 state.talkback(. Close); 1060 } else { 1061 sink(. signal); 1062 } 1063 | Push(_) => () 1064 } 1065 ); 1066 }) 1067 );