Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.8 29 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 } 48 ); 49 | Push(value) when !state.ended => 50 Rebel.MutableQueue.add(state.buffer, value); 51 if (!state.pulled) { 52 state.pulled = true; 53 state.sourceTalkback(. Pull); 54 state.notifierTalkback(. Pull); 55 } else { 56 state.pulled = false; 57 }; 58 | Push(_) => () 59 | End when !state.ended => 60 state.ended = true; 61 state.notifierTalkback(. Close); 62 if (Rebel.MutableQueue.size(state.buffer) > 0) { 63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 64 }; 65 sink(. End); 66 | End => () 67 } 68 ); 69 70 sink(. 71 Start( 72 (. signal) => 73 if (!state.ended) { 74 switch (signal) { 75 | Close => 76 state.ended = true; 77 state.sourceTalkback(. Close); 78 state.notifierTalkback(. Close); 79 | Pull when !state.pulled => 80 state.pulled = true; 81 state.sourceTalkback(. Pull); 82 state.notifierTalkback(. Pull); 83 | Pull => () 84 }; 85 }, 86 ), 87 ); 88 }) 89 ); 90 91type combineStateT('a, 'b) = { 92 mutable talkbackA: (. talkbackT) => unit, 93 mutable talkbackB: (. talkbackT) => unit, 94 mutable lastValA: option('a), 95 mutable lastValB: option('b), 96 mutable gotSignal: bool, 97 mutable endCounter: int, 98 mutable ended: bool, 99}; 100 101[@genType] 102let combine = 103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 104 curry(sink => { 105 let state = { 106 talkbackA: talkbackPlaceholder, 107 talkbackB: talkbackPlaceholder, 108 lastValA: None, 109 lastValB: None, 110 gotSignal: false, 111 endCounter: 0, 112 ended: false, 113 }; 114 115 sourceA((. signal) => 116 switch (signal, state.lastValB) { 117 | (Start(tb), _) => state.talkbackA = tb 118 | (Push(a), None) => 119 state.lastValA = Some(a); 120 if (!state.gotSignal) { 121 state.talkbackB(. Pull); 122 } else { 123 state.gotSignal = false; 124 }; 125 | (Push(a), Some(b)) when !state.ended => 126 state.lastValA = Some(a); 127 state.gotSignal = false; 128 sink(. Push((a, b))); 129 | (End, _) when state.endCounter < 1 => 130 state.endCounter = state.endCounter + 1 131 | (End, _) when !state.ended => 132 state.ended = true; 133 sink(. End); 134 | _ => () 135 } 136 ); 137 138 sourceB((. signal) => 139 switch (signal, state.lastValA) { 140 | (Start(tb), _) => state.talkbackB = tb 141 | (Push(b), None) => 142 state.lastValB = Some(b); 143 if (!state.gotSignal) { 144 state.talkbackA(. Pull); 145 } else { 146 state.gotSignal = false; 147 }; 148 | (Push(b), Some(a)) when !state.ended => 149 state.lastValB = Some(b); 150 state.gotSignal = false; 151 sink(. Push((a, b))); 152 | (End, _) when state.endCounter < 1 => 153 state.endCounter = state.endCounter + 1 154 | (End, _) when !state.ended => 155 state.ended = true; 156 sink(. End); 157 | _ => () 158 } 159 ); 160 161 sink(. 162 Start( 163 (. signal) => 164 if (!state.ended) { 165 switch (signal) { 166 | Close => 167 state.ended = true; 168 state.talkbackA(. Close); 169 state.talkbackB(. Close); 170 | Pull when !state.gotSignal => 171 state.gotSignal = true; 172 state.talkbackA(. signal); 173 state.talkbackB(. signal); 174 | Pull => () 175 }; 176 }, 177 ), 178 ); 179 }); 180 181type concatMapStateT('a) = { 182 inputQueue: Rebel.MutableQueue.t('a), 183 mutable outerTalkback: (. talkbackT) => unit, 184 mutable outerPulled: bool, 185 mutable innerTalkback: (. talkbackT) => unit, 186 mutable innerActive: bool, 187 mutable innerPulled: bool, 188 mutable ended: bool, 189}; 190 191[@genType] 192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 193 curry(source => 194 curry(sink => { 195 let state: concatMapStateT('a) = { 196 inputQueue: Rebel.MutableQueue.make(), 197 outerTalkback: talkbackPlaceholder, 198 outerPulled: false, 199 innerTalkback: talkbackPlaceholder, 200 innerActive: false, 201 innerPulled: false, 202 ended: false, 203 }; 204 205 let rec applyInnerSource = innerSource => { 206 state.innerActive = true; 207 innerSource((. signal) => 208 switch (signal) { 209 | Start(tb) => 210 state.innerTalkback = tb; 211 state.innerPulled = false; 212 tb(. Pull); 213 | Push(_) when state.innerActive => 214 sink(. signal); 215 if (!state.innerPulled) { 216 state.innerTalkback(. Pull); 217 } else { 218 state.innerPulled = false; 219 }; 220 | Push(_) => () 221 | End when state.innerActive => 222 state.innerActive = false; 223 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 224 | Some(input) => applyInnerSource(f(. input)) 225 | None when state.ended => sink(. End) 226 | None when !state.outerPulled => 227 state.outerPulled = true; 228 state.outerTalkback(. Pull); 229 | None => () 230 }; 231 | End => () 232 } 233 ); 234 }; 235 236 source((. signal) => 237 switch (signal) { 238 | Start(tb) => state.outerTalkback = tb 239 | Push(x) when !state.ended => 240 state.outerPulled = false; 241 if (state.innerActive) { 242 Rebel.MutableQueue.add(state.inputQueue, x); 243 } else { 244 applyInnerSource(f(. x)); 245 }; 246 | Push(_) => () 247 | End when !state.ended => 248 state.ended = true; 249 if (!state.innerActive 250 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 251 sink(. End); 252 }; 253 | End => () 254 } 255 ); 256 257 sink(. 258 Start( 259 (. signal) => 260 switch (signal) { 261 | Pull => 262 if (!state.ended && !state.outerPulled) { 263 state.outerPulled = true; 264 state.outerTalkback(. Pull); 265 }; 266 if (state.innerActive && !state.innerPulled) { 267 state.innerPulled = true; 268 state.innerTalkback(. Pull); 269 }; 270 | Close => 271 if (!state.ended) { 272 state.ended = true; 273 state.outerTalkback(. Close); 274 }; 275 if (state.innerActive) { 276 state.innerActive = false; 277 state.innerTalkback(. Close); 278 }; 279 }, 280 ), 281 ); 282 }) 283 ); 284 285[@genType] 286let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 287 concatMap((. x) => x, source); 288 289[@genType] 290let concat = (sources: array(sourceT('a))): sourceT('a) => 291 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 292 293[@genType] 294let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 295 curry(source => 296 curry(sink => { 297 let talkback = ref(talkbackPlaceholder); 298 299 source((. signal) => 300 switch (signal) { 301 | Start(tb) => 302 talkback := tb; 303 sink(. signal); 304 | Push(x) when !f(. x) => talkback^(. Pull) 305 | _ => sink(. signal) 306 } 307 ); 308 }) 309 ); 310 311[@genType] 312let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 313 curry(source => 314 curry(sink => 315 source((. signal) => 316 sink(. 317 /* The signal needs to be recreated for genType to generate 318 the correct generics during codegen */ 319 switch (signal) { 320 | Start(x) => Start(x) 321 | Push(x) => Push(f(. x)) 322 | End => End 323 }, 324 ) 325 ) 326 ) 327 ); 328 329type mergeMapStateT = { 330 mutable outerTalkback: (. talkbackT) => unit, 331 mutable outerPulled: bool, 332 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 333 mutable ended: bool, 334}; 335 336[@genType] 337let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 338 curry(source => 339 curry(sink => { 340 let state: mergeMapStateT = { 341 outerTalkback: talkbackPlaceholder, 342 outerPulled: false, 343 innerTalkbacks: Rebel.Array.makeEmpty(), 344 ended: false, 345 }; 346 347 let applyInnerSource = innerSource => { 348 let talkback = ref(talkbackPlaceholder); 349 350 innerSource((. signal) => 351 switch (signal) { 352 | Start(tb) => 353 talkback := tb; 354 state.innerTalkbacks = 355 Rebel.Array.append(state.innerTalkbacks, tb); 356 tb(. Pull); 357 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 358 sink(. Push(x)); 359 talkback^(. Pull); 360 | Push(_) => () 361 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 362 state.innerTalkbacks = 363 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 364 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; 365 if (state.ended && exhausted) { 366 sink(. End); 367 } else if (!state.outerPulled && exhausted) { 368 state.outerPulled = true; 369 state.outerTalkback(. Pull); 370 }; 371 | End => () 372 } 373 ); 374 }; 375 376 source((. signal) => 377 switch (signal) { 378 | Start(tb) => state.outerTalkback = tb 379 | Push(x) when !state.ended => 380 state.outerPulled = false; 381 applyInnerSource(f(. x)); 382 if (!state.outerPulled) { 383 state.outerPulled = true; 384 state.outerTalkback(. Pull); 385 }; 386 | Push(_) => () 387 | End when !state.ended => 388 state.ended = true; 389 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 390 sink(. End); 391 }; 392 | End => () 393 } 394 ); 395 396 sink(. 397 Start( 398 (. signal) => 399 switch (signal) { 400 | Close => 401 if (!state.ended) { 402 state.ended = true; 403 state.outerTalkback(. signal); 404 }; 405 406 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); 407 state.innerTalkbacks = Rebel.Array.makeEmpty(); 408 | Pull => 409 if (!state.outerPulled && !state.ended) { 410 state.outerPulled = true; 411 state.outerTalkback(. Pull); 412 } else { 413 state.outerPulled = false; 414 }; 415 416 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 417 }, 418 ), 419 ); 420 }) 421 ); 422 423[@genType] 424let merge = (sources: array(sourceT('a))): sourceT('a) => 425 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 426 427[@genType] 428let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 429 mergeMap((. x) => x, source); 430 431[@genType] 432let flatten = mergeAll; 433 434[@genType] 435let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 436 curry(source => 437 curry(sink => { 438 let ended = ref(false); 439 source((. signal) => 440 switch (signal) { 441 | Start(talkback) => 442 sink(. 443 Start( 444 (. signal) => 445 if (! ended^) { 446 switch (signal) { 447 | Pull => talkback(. signal) 448 | Close => 449 ended := true; 450 talkback(. signal); 451 f(.); 452 }; 453 }, 454 ), 455 ) 456 | Push(_) when ! ended^ => sink(. signal) 457 | Push(_) => () 458 | End when ! ended^ => 459 ended := true; 460 sink(. signal); 461 f(.); 462 | End => () 463 } 464 ); 465 }) 466 ); 467 468[@genType] 469let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 470 curry(source => 471 curry(sink => { 472 let ended = ref(false); 473 source((. signal) => { 474 switch (signal) { 475 | Start(talkback) => 476 sink(. 477 Start( 478 (. signal) => 479 if (! ended^) { 480 switch (signal) { 481 | Pull => talkback(. signal) 482 | Close => 483 ended := true; 484 talkback(. signal); 485 }; 486 }, 487 ), 488 ) 489 | Push(x) when ! ended^ => 490 f(. x); 491 sink(. signal); 492 | Push(_) => () 493 | End when ! ended^ => 494 ended := true; 495 sink(. signal); 496 | End => () 497 } 498 }); 499 }) 500 ); 501 502[@genType] 503let tap = onPush; 504 505[@genType] 506let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 507 curry(source => 508 curry(sink => 509 source((. signal) => 510 switch (signal) { 511 | Start(_) => 512 sink(. signal); 513 f(.); 514 | _ => sink(. signal) 515 } 516 ) 517 ) 518 ); 519 520type sampleStateT('a) = { 521 mutable sourceTalkback: (. talkbackT) => unit, 522 mutable notifierTalkback: (. talkbackT) => unit, 523 mutable value: option('a), 524 mutable pulled: bool, 525 mutable ended: bool, 526}; 527 528[@genType] 529let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 530 curry(source => 531 curry(sink => { 532 let state = { 533 sourceTalkback: talkbackPlaceholder, 534 notifierTalkback: talkbackPlaceholder, 535 value: None, 536 pulled: false, 537 ended: false, 538 }; 539 540 source((. signal) => 541 switch (signal) { 542 | Start(tb) => state.sourceTalkback = tb 543 | Push(x) => 544 state.value = Some(x); 545 if (!state.pulled) { 546 state.pulled = true; 547 state.notifierTalkback(. Pull); 548 state.sourceTalkback(. Pull); 549 } else { 550 state.pulled = false; 551 }; 552 | End when !state.ended => 553 state.ended = true; 554 state.notifierTalkback(. Close); 555 sink(. End); 556 | End => () 557 } 558 ); 559 560 notifier((. signal) => 561 switch (signal, state.value) { 562 | (Start(tb), _) => state.notifierTalkback = tb 563 | (End, _) when !state.ended => 564 state.ended = true; 565 state.sourceTalkback(. Close); 566 sink(. End); 567 | (End, _) => () 568 | (Push(_), Some(x)) when !state.ended => 569 state.value = None; 570 sink(. Push(x)); 571 | (Push(_), _) => () 572 } 573 ); 574 575 sink(. 576 Start( 577 (. signal) => 578 if (!state.ended) { 579 switch (signal) { 580 | Pull when !state.pulled => 581 state.pulled = true; 582 state.sourceTalkback(. Pull); 583 state.notifierTalkback(. Pull); 584 | Pull => () 585 | Close => 586 state.ended = true; 587 state.sourceTalkback(. Close); 588 state.notifierTalkback(. Close); 589 }; 590 }, 591 ), 592 ); 593 }) 594 ); 595 596[@genType] 597let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 598 curry(source => 599 curry(sink => { 600 let acc = ref(seed); 601 602 source((. signal) => 603 sink(. 604 switch (signal) { 605 | Push(x) => 606 acc := f(. acc^, x); 607 Push(acc^); 608 | Start(x) => Start(x) 609 | End => End 610 }, 611 ) 612 ); 613 }) 614 ); 615 616type shareStateT('a) = { 617 mutable sinks: Rebel.Array.t(sinkT('a)), 618 mutable talkback: (. talkbackT) => unit, 619 mutable gotSignal: bool, 620}; 621 622[@genType] 623let share = (source: sourceT('a)): sourceT('a) => { 624 let state = { 625 sinks: Rebel.Array.makeEmpty(), 626 talkback: talkbackPlaceholder, 627 gotSignal: false, 628 }; 629 630 sink => { 631 state.sinks = Rebel.Array.append(state.sinks, sink); 632 633 if (Rebel.Array.size(state.sinks) === 1) { 634 source((. signal) => 635 switch (signal) { 636 | Push(_) => 637 state.gotSignal = false; 638 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 639 | Start(x) => state.talkback = x 640 | End => 641 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 642 state.sinks = Rebel.Array.makeEmpty(); 643 } 644 ); 645 }; 646 647 sink(. 648 Start( 649 (. signal) => 650 switch (signal) { 651 | Close => 652 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 653 if (Rebel.Array.size(state.sinks) === 0) { 654 state.talkback(. Close); 655 }; 656 | Pull when !state.gotSignal => 657 state.gotSignal = true; 658 state.talkback(. signal); 659 | Pull => () 660 }, 661 ), 662 ); 663 }; 664}; 665 666type skipStateT = { 667 mutable talkback: (. talkbackT) => unit, 668 mutable rest: int, 669}; 670 671[@genType] 672let skip = (wait: int): operatorT('a, 'a) => 673 curry(source => 674 curry(sink => { 675 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 676 677 source((. signal) => 678 switch (signal) { 679 | Start(tb) => 680 state.talkback = tb; 681 sink(. signal); 682 | Push(_) when state.rest > 0 => 683 state.rest = state.rest - 1; 684 state.talkback(. Pull); 685 | _ => sink(. signal) 686 } 687 ); 688 }) 689 ); 690 691type skipUntilStateT = { 692 mutable sourceTalkback: (. talkbackT) => unit, 693 mutable notifierTalkback: (. talkbackT) => unit, 694 mutable skip: bool, 695 mutable pulled: bool, 696 mutable ended: bool, 697}; 698 699[@genType] 700let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 701 curry(source => 702 curry(sink => { 703 let state: skipUntilStateT = { 704 sourceTalkback: talkbackPlaceholder, 705 notifierTalkback: talkbackPlaceholder, 706 skip: true, 707 pulled: false, 708 ended: false, 709 }; 710 711 source((. signal) => 712 switch (signal) { 713 | Start(tb) => 714 state.sourceTalkback = tb; 715 716 notifier((. signal) => 717 switch (signal) { 718 | Start(innerTb) => 719 state.notifierTalkback = innerTb; 720 innerTb(. Pull); 721 | Push(_) => 722 state.skip = false; 723 state.notifierTalkback(. Close); 724 | End when state.skip => 725 state.ended = true; 726 state.sourceTalkback(. Close); 727 | End => () 728 } 729 ); 730 | Push(_) when !state.skip && !state.ended => 731 state.pulled = false; 732 sink(. signal); 733 | Push(_) when !state.pulled => 734 state.pulled = true; 735 state.sourceTalkback(. Pull); 736 state.notifierTalkback(. Pull); 737 | Push(_) => state.pulled = false 738 | End => 739 if (state.skip) { 740 state.notifierTalkback(. Close); 741 }; 742 state.ended = true; 743 sink(. End); 744 } 745 ); 746 747 sink(. 748 Start( 749 (. signal) => 750 if (!state.ended) { 751 switch (signal) { 752 | Close => 753 state.ended = true; 754 state.sourceTalkback(. Close); 755 if (state.skip) { 756 state.notifierTalkback(. Close); 757 }; 758 | Pull when !state.pulled => 759 state.pulled = true; 760 if (state.skip) { 761 state.notifierTalkback(. Pull); 762 }; 763 state.sourceTalkback(. Pull); 764 | Pull => () 765 }; 766 }, 767 ), 768 ); 769 }) 770 ); 771 772type skipWhileStateT = { 773 mutable talkback: (. talkbackT) => unit, 774 mutable skip: bool, 775}; 776 777[@genType] 778let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 779 curry(source => 780 curry(sink => { 781 let state: skipWhileStateT = { 782 talkback: talkbackPlaceholder, 783 skip: true, 784 }; 785 786 source((. signal) => 787 switch (signal) { 788 | Start(tb) => 789 state.talkback = tb; 790 sink(. signal); 791 | Push(x) when state.skip => 792 if (f(. x)) { 793 state.talkback(. Pull); 794 } else { 795 state.skip = false; 796 sink(. signal); 797 } 798 | _ => sink(. signal) 799 } 800 ); 801 }) 802 ); 803 804type switchMapStateT('a) = { 805 mutable outerTalkback: (. talkbackT) => unit, 806 mutable outerPulled: bool, 807 mutable innerTalkback: (. talkbackT) => unit, 808 mutable innerActive: bool, 809 mutable innerPulled: bool, 810 mutable ended: bool, 811}; 812 813[@genType] 814let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 815 curry(source => 816 curry(sink => { 817 let state: switchMapStateT('a) = { 818 outerTalkback: talkbackPlaceholder, 819 outerPulled: false, 820 innerTalkback: talkbackPlaceholder, 821 innerActive: false, 822 innerPulled: false, 823 ended: false, 824 }; 825 826 let applyInnerSource = innerSource => { 827 state.innerActive = true; 828 innerSource((. signal) => 829 if (state.innerActive) { 830 switch (signal) { 831 | Start(tb) => 832 state.innerTalkback = tb; 833 state.innerPulled = false; 834 tb(. Pull); 835 | Push(_) => 836 sink(. signal); 837 if (!state.innerPulled) { 838 state.innerTalkback(. Pull); 839 } else { 840 state.innerPulled = false; 841 }; 842 | End => 843 state.innerActive = false; 844 if (state.ended) { 845 sink(. signal); 846 } else if (!state.outerPulled) { 847 state.outerPulled = true; 848 state.outerTalkback(. Pull); 849 }; 850 }; 851 } 852 ); 853 }; 854 855 source((. signal) => 856 switch (signal) { 857 | Start(tb) => state.outerTalkback = tb 858 | Push(x) when !state.ended => 859 if (state.innerActive) { 860 state.innerTalkback(. Close); 861 state.innerTalkback = talkbackPlaceholder; 862 }; 863 864 if (!state.outerPulled) { 865 state.outerPulled = true; 866 state.outerTalkback(. Pull); 867 } else { 868 state.outerPulled = false; 869 }; 870 871 applyInnerSource(f(. x)); 872 | Push(_) => () 873 | End when !state.ended => 874 state.ended = true; 875 if (!state.innerActive) { 876 sink(. End); 877 }; 878 | End => () 879 } 880 ); 881 882 sink(. 883 Start( 884 (. signal) => 885 switch (signal) { 886 | Pull => 887 if (!state.ended && !state.outerPulled) { 888 state.outerPulled = true; 889 state.outerTalkback(. Pull); 890 }; 891 if (state.innerActive && !state.innerPulled) { 892 state.innerPulled = true; 893 state.innerTalkback(. Pull); 894 }; 895 | Close => 896 if (!state.ended) { 897 state.ended = true; 898 state.outerTalkback(. Close); 899 }; 900 if (state.innerActive) { 901 state.innerActive = false; 902 state.innerTalkback(. Close); 903 }; 904 }, 905 ), 906 ); 907 }) 908 ); 909 910[@genType] 911let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 912 switchMap((. x) => x, source); 913 914type takeStateT = { 915 mutable ended: bool, 916 mutable taken: int, 917 mutable talkback: (. talkbackT) => unit, 918}; 919 920[@genType] 921let take = (max: int): operatorT('a, 'a) => 922 curry(source => 923 curry(sink => { 924 let state: takeStateT = { 925 ended: false, 926 taken: 0, 927 talkback: talkbackPlaceholder, 928 }; 929 930 source((. signal) => 931 switch (signal) { 932 | Start(tb) when max <= 0 => 933 state.ended = true; 934 sink(. End); 935 tb(. Close); 936 | Start(tb) => state.talkback = tb 937 | Push(_) when state.taken < max && !state.ended => 938 state.taken = state.taken + 1; 939 sink(. signal); 940 if (!state.ended && state.taken >= max) { 941 state.ended = true; 942 sink(. End); 943 state.talkback(. Close); 944 }; 945 | Push(_) => () 946 | End when !state.ended => 947 state.ended = true; 948 sink(. End); 949 | End => () 950 } 951 ); 952 953 sink(. 954 Start( 955 (. signal) => 956 if (!state.ended) { 957 switch (signal) { 958 | Pull when state.taken < max => state.talkback(. Pull) 959 | Pull => () 960 | Close => 961 state.ended = true; 962 state.talkback(. Close); 963 }; 964 }, 965 ), 966 ); 967 }) 968 ); 969 970[@genType] 971let takeLast = (max: int): operatorT('a, 'a) => 972 curry(source => 973 curry(sink => { 974 open Rebel; 975 let talkback = ref(talkbackPlaceholder); 976 let queue = MutableQueue.make(); 977 978 source((. signal) => 979 switch (signal) { 980 | Start(tb) when max <= 0 => 981 tb(. Close); 982 Wonka_sources.empty(sink); 983 | Start(tb) => 984 talkback := tb; 985 tb(. Pull); 986 | Push(x) => 987 let size = MutableQueue.size(queue); 988 if (size >= max && max > 0) { 989 ignore(MutableQueue.pop(queue)); 990 }; 991 992 MutableQueue.add(queue, x); 993 talkback^(. Pull); 994 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 995 } 996 ); 997 }) 998 ); 999 1000type takeUntilStateT = { 1001 mutable ended: bool, 1002 mutable sourceTalkback: (. talkbackT) => unit, 1003 mutable notifierTalkback: (. talkbackT) => unit, 1004}; 1005 1006[@genType] 1007let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 1008 curry(source => 1009 curry(sink => { 1010 let state: takeUntilStateT = { 1011 ended: false, 1012 sourceTalkback: talkbackPlaceholder, 1013 notifierTalkback: talkbackPlaceholder, 1014 }; 1015 1016 source((. signal) => 1017 switch (signal) { 1018 | Start(tb) => 1019 state.sourceTalkback = tb; 1020 1021 notifier((. signal) => 1022 switch (signal) { 1023 | Start(innerTb) => 1024 state.notifierTalkback = innerTb; 1025 innerTb(. Pull); 1026 | Push(_) => 1027 state.ended = true; 1028 state.sourceTalkback(. Close); 1029 sink(. End); 1030 | End => () 1031 } 1032 ); 1033 | End when !state.ended => 1034 state.ended = true; 1035 state.notifierTalkback(. Close); 1036 sink(. End); 1037 | End => () 1038 | Push(_) when !state.ended => sink(. signal) 1039 | Push(_) => () 1040 } 1041 ); 1042 1043 sink(. 1044 Start( 1045 (. signal) => 1046 if (!state.ended) { 1047 switch (signal) { 1048 | Close => 1049 state.ended = true; 1050 state.sourceTalkback(. Close); 1051 state.notifierTalkback(. Close); 1052 | Pull => state.sourceTalkback(. Pull) 1053 }; 1054 }, 1055 ), 1056 ); 1057 }) 1058 ); 1059 1060type takeWhileStateT = { 1061 mutable talkback: (. talkbackT) => unit, 1062 mutable ended: bool, 1063}; 1064 1065[@genType] 1066let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1067 curry(source => 1068 curry(sink => { 1069 let state: takeWhileStateT = { 1070 talkback: talkbackPlaceholder, 1071 ended: false, 1072 }; 1073 1074 source((. signal) => 1075 switch (signal) { 1076 | Start(tb) => 1077 state.talkback = tb; 1078 sink(. signal); 1079 | End when !state.ended => 1080 state.ended = true; 1081 sink(. End); 1082 | End => () 1083 | Push(x) when !state.ended => 1084 if (!f(. x)) { 1085 state.ended = true; 1086 sink(. End); 1087 state.talkback(. Close); 1088 } else { 1089 sink(. signal); 1090 } 1091 | Push(_) => () 1092 } 1093 ); 1094 }) 1095 );