Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.11 29 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 } 48 ); 49 | Push(value) when !state.ended => 50 Rebel.MutableQueue.add(state.buffer, value); 51 if (!state.pulled) { 52 state.pulled = true; 53 state.sourceTalkback(. Pull); 54 state.notifierTalkback(. Pull); 55 } else { 56 state.pulled = false; 57 }; 58 | Push(_) => () 59 | End when !state.ended => 60 state.ended = true; 61 state.notifierTalkback(. Close); 62 if (Rebel.MutableQueue.size(state.buffer) > 0) { 63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 64 }; 65 sink(. End); 66 | End => () 67 } 68 ); 69 70 sink(. 71 Start( 72 (. signal) => 73 if (!state.ended) { 74 switch (signal) { 75 | Close => 76 state.ended = true; 77 state.sourceTalkback(. Close); 78 state.notifierTalkback(. Close); 79 | Pull when !state.pulled => 80 state.pulled = true; 81 state.sourceTalkback(. Pull); 82 state.notifierTalkback(. Pull); 83 | Pull => () 84 }; 85 }, 86 ), 87 ); 88 }) 89 ); 90 91type combineStateT('a, 'b) = { 92 mutable talkbackA: (. talkbackT) => unit, 93 mutable talkbackB: (. talkbackT) => unit, 94 mutable lastValA: option('a), 95 mutable lastValB: option('b), 96 mutable gotSignal: bool, 97 mutable endCounter: int, 98 mutable ended: bool, 99}; 100 101[@genType] 102let combine = 103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 104 curry(sink => { 105 let state = { 106 talkbackA: talkbackPlaceholder, 107 talkbackB: talkbackPlaceholder, 108 lastValA: None, 109 lastValB: None, 110 gotSignal: false, 111 endCounter: 0, 112 ended: false, 113 }; 114 115 sourceA((. signal) => 116 switch (signal, state.lastValB) { 117 | (Start(tb), _) => state.talkbackA = tb 118 | (Push(a), None) => 119 state.lastValA = Some(a); 120 if (!state.gotSignal) { 121 state.talkbackB(. Pull); 122 } else { 123 state.gotSignal = false; 124 }; 125 | (Push(a), Some(b)) when !state.ended => 126 state.lastValA = Some(a); 127 state.gotSignal = false; 128 sink(. Push((a, b))); 129 | (End, _) when state.endCounter < 1 => 130 state.endCounter = state.endCounter + 1 131 | (End, _) when !state.ended => 132 state.ended = true; 133 sink(. End); 134 | _ => () 135 } 136 ); 137 138 sourceB((. signal) => 139 switch (signal, state.lastValA) { 140 | (Start(tb), _) => state.talkbackB = tb 141 | (Push(b), None) => 142 state.lastValB = Some(b); 143 if (!state.gotSignal) { 144 state.talkbackA(. Pull); 145 } else { 146 state.gotSignal = false; 147 }; 148 | (Push(b), Some(a)) when !state.ended => 149 state.lastValB = Some(b); 150 state.gotSignal = false; 151 sink(. Push((a, b))); 152 | (End, _) when state.endCounter < 1 => 153 state.endCounter = state.endCounter + 1 154 | (End, _) when !state.ended => 155 state.ended = true; 156 sink(. End); 157 | _ => () 158 } 159 ); 160 161 sink(. 162 Start( 163 (. signal) => 164 if (!state.ended) { 165 switch (signal) { 166 | Close => 167 state.ended = true; 168 state.talkbackA(. Close); 169 state.talkbackB(. Close); 170 | Pull when !state.gotSignal => 171 state.gotSignal = true; 172 state.talkbackA(. signal); 173 state.talkbackB(. signal); 174 | Pull => () 175 }; 176 }, 177 ), 178 ); 179 }); 180 181type concatMapStateT('a) = { 182 inputQueue: Rebel.MutableQueue.t('a), 183 mutable outerTalkback: (. talkbackT) => unit, 184 mutable outerPulled: bool, 185 mutable innerTalkback: (. talkbackT) => unit, 186 mutable innerActive: bool, 187 mutable innerPulled: bool, 188 mutable ended: bool, 189}; 190 191[@genType] 192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 193 curry(source => 194 curry(sink => { 195 let state: concatMapStateT('a) = { 196 inputQueue: Rebel.MutableQueue.make(), 197 outerTalkback: talkbackPlaceholder, 198 outerPulled: false, 199 innerTalkback: talkbackPlaceholder, 200 innerActive: false, 201 innerPulled: false, 202 ended: false, 203 }; 204 205 let rec applyInnerSource = innerSource => { 206 state.innerActive = true; 207 innerSource((. signal) => 208 switch (signal) { 209 | Start(tb) => 210 state.innerTalkback = tb; 211 state.innerPulled = false; 212 tb(. Pull); 213 | Push(_) when state.innerActive => 214 sink(. signal); 215 if (!state.innerPulled) { 216 state.innerTalkback(. Pull); 217 } else { 218 state.innerPulled = false; 219 }; 220 | Push(_) => () 221 | End when state.innerActive => 222 state.innerActive = false; 223 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 224 | Some(input) => applyInnerSource(f(. input)) 225 | None when state.ended => sink(. End) 226 | None when !state.outerPulled => 227 state.outerPulled = true; 228 state.outerTalkback(. Pull); 229 | None => () 230 }; 231 | End => () 232 } 233 ); 234 }; 235 236 source((. signal) => 237 switch (signal) { 238 | Start(tb) => state.outerTalkback = tb 239 | Push(x) when !state.ended => 240 state.outerPulled = false; 241 if (state.innerActive) { 242 Rebel.MutableQueue.add(state.inputQueue, x); 243 } else { 244 applyInnerSource(f(. x)); 245 }; 246 | Push(_) => () 247 | End when !state.ended => 248 state.ended = true; 249 if (!state.innerActive 250 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 251 sink(. End); 252 }; 253 | End => () 254 } 255 ); 256 257 sink(. 258 Start( 259 (. signal) => 260 switch (signal) { 261 | Pull => 262 if (!state.ended && !state.outerPulled) { 263 state.outerPulled = true; 264 state.outerTalkback(. Pull); 265 }; 266 if (state.innerActive && !state.innerPulled) { 267 state.innerPulled = true; 268 state.innerTalkback(. Pull); 269 }; 270 | Close => 271 if (!state.ended) { 272 state.ended = true; 273 state.outerTalkback(. Close); 274 }; 275 if (state.innerActive) { 276 state.innerActive = false; 277 state.innerTalkback(. Close); 278 }; 279 }, 280 ), 281 ); 282 }) 283 ); 284 285[@genType] 286let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 287 concatMap((. x) => x, source); 288 289[@genType] 290let concat = (sources: array(sourceT('a))): sourceT('a) => 291 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 292 293[@genType] 294let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 295 curry(source => 296 curry(sink => { 297 let talkback = ref(talkbackPlaceholder); 298 299 source((. signal) => 300 switch (signal) { 301 | Start(tb) => 302 talkback := tb; 303 sink(. signal); 304 | Push(x) when !f(. x) => talkback^(. Pull) 305 | _ => sink(. signal) 306 } 307 ); 308 }) 309 ); 310 311[@genType] 312let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 313 curry(source => 314 curry(sink => 315 source((. signal) => 316 sink(. 317 /* The signal needs to be recreated for genType to generate 318 the correct generics during codegen */ 319 switch (signal) { 320 | Start(x) => Start(x) 321 | Push(x) => Push(f(. x)) 322 | End => End 323 }, 324 ) 325 ) 326 ) 327 ); 328 329type mergeMapStateT = { 330 mutable outerTalkback: (. talkbackT) => unit, 331 mutable outerPulled: bool, 332 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 333 mutable ended: bool, 334}; 335 336[@genType] 337let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 338 curry(source => 339 curry(sink => { 340 let state: mergeMapStateT = { 341 outerTalkback: talkbackPlaceholder, 342 outerPulled: false, 343 innerTalkbacks: Rebel.Array.makeEmpty(), 344 ended: false, 345 }; 346 347 let applyInnerSource = innerSource => { 348 let talkback = ref(talkbackPlaceholder); 349 350 innerSource((. signal) => 351 switch (signal) { 352 | Start(tb) => 353 talkback := tb; 354 state.innerTalkbacks = 355 Rebel.Array.append(state.innerTalkbacks, tb); 356 tb(. Pull); 357 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 358 sink(. Push(x)); 359 talkback^(. Pull); 360 | Push(_) => () 361 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 362 state.innerTalkbacks = 363 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 364 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; 365 if (state.ended && exhausted) { 366 sink(. End); 367 } else if (!state.outerPulled && exhausted) { 368 state.outerPulled = true; 369 state.outerTalkback(. Pull); 370 }; 371 | End => () 372 } 373 ); 374 }; 375 376 source((. signal) => 377 switch (signal) { 378 | Start(tb) => state.outerTalkback = tb 379 | Push(x) when !state.ended => 380 state.outerPulled = false; 381 applyInnerSource(f(. x)); 382 if (!state.outerPulled) { 383 state.outerPulled = true; 384 state.outerTalkback(. Pull); 385 }; 386 | Push(_) => () 387 | End when !state.ended => 388 state.ended = true; 389 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 390 sink(. End); 391 }; 392 | End => () 393 } 394 ); 395 396 sink(. 397 Start( 398 (. signal) => 399 switch (signal) { 400 | Close => 401 if (!state.ended) { 402 state.ended = true; 403 state.outerTalkback(. signal); 404 }; 405 406 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); 407 state.innerTalkbacks = Rebel.Array.makeEmpty(); 408 | Pull => 409 if (!state.outerPulled && !state.ended) { 410 state.outerPulled = true; 411 state.outerTalkback(. Pull); 412 } else { 413 state.outerPulled = false; 414 }; 415 416 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 417 }, 418 ), 419 ); 420 }) 421 ); 422 423[@genType] 424let merge = (sources: array(sourceT('a))): sourceT('a) => 425 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 426 427[@genType] 428let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 429 mergeMap((. x) => x, source); 430 431[@genType] 432let flatten = mergeAll; 433 434[@genType] 435let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 436 curry(source => 437 curry(sink => { 438 let ended = ref(false); 439 source((. signal) => 440 switch (signal) { 441 | Start(talkback) => 442 sink(. 443 Start( 444 (. signal) => 445 if (! ended^) { 446 switch (signal) { 447 | Pull => talkback(. signal) 448 | Close => 449 ended := true; 450 talkback(. signal); 451 f(.); 452 }; 453 }, 454 ), 455 ) 456 | Push(_) when ! ended^ => sink(. signal) 457 | Push(_) => () 458 | End when ! ended^ => 459 ended := true; 460 sink(. signal); 461 f(.); 462 | End => () 463 } 464 ); 465 }) 466 ); 467 468[@genType] 469let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 470 curry(source => 471 curry(sink => { 472 let ended = ref(false); 473 source((. signal) => { 474 switch (signal) { 475 | Start(talkback) => 476 sink(. 477 Start( 478 (. signal) => 479 if (! ended^) { 480 switch (signal) { 481 | Pull => talkback(. signal) 482 | Close => 483 ended := true; 484 talkback(. signal); 485 }; 486 }, 487 ), 488 ) 489 | Push(x) when ! ended^ => 490 f(. x); 491 sink(. signal); 492 | Push(_) => () 493 | End when ! ended^ => 494 ended := true; 495 sink(. signal); 496 | End => () 497 } 498 }); 499 }) 500 ); 501 502[@genType] 503let tap = onPush; 504 505[@genType] 506let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 507 curry(source => 508 curry(sink => 509 source((. signal) => 510 switch (signal) { 511 | Start(_) => 512 sink(. signal); 513 f(.); 514 | _ => sink(. signal) 515 } 516 ) 517 ) 518 ); 519 520type sampleStateT('a) = { 521 mutable sourceTalkback: (. talkbackT) => unit, 522 mutable notifierTalkback: (. talkbackT) => unit, 523 mutable value: option('a), 524 mutable pulled: bool, 525 mutable ended: bool, 526}; 527 528[@genType] 529let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 530 curry(source => 531 curry(sink => { 532 let state = { 533 sourceTalkback: talkbackPlaceholder, 534 notifierTalkback: talkbackPlaceholder, 535 value: None, 536 pulled: false, 537 ended: false, 538 }; 539 540 source((. signal) => 541 switch (signal) { 542 | Start(tb) => state.sourceTalkback = tb 543 | Push(x) => 544 state.value = Some(x); 545 if (!state.pulled) { 546 state.pulled = true; 547 state.notifierTalkback(. Pull); 548 state.sourceTalkback(. Pull); 549 } else { 550 state.pulled = false; 551 }; 552 | End when !state.ended => 553 state.ended = true; 554 state.notifierTalkback(. Close); 555 sink(. End); 556 | End => () 557 } 558 ); 559 560 notifier((. signal) => 561 switch (signal, state.value) { 562 | (Start(tb), _) => state.notifierTalkback = tb 563 | (End, _) when !state.ended => 564 state.ended = true; 565 state.sourceTalkback(. Close); 566 sink(. End); 567 | (End, _) => () 568 | (Push(_), Some(x)) when !state.ended => 569 state.value = None; 570 sink(. Push(x)); 571 | (Push(_), _) => () 572 } 573 ); 574 575 sink(. 576 Start( 577 (. signal) => 578 if (!state.ended) { 579 switch (signal) { 580 | Pull when !state.pulled => 581 state.pulled = true; 582 state.sourceTalkback(. Pull); 583 state.notifierTalkback(. Pull); 584 | Pull => () 585 | Close => 586 state.ended = true; 587 state.sourceTalkback(. Close); 588 state.notifierTalkback(. Close); 589 }; 590 }, 591 ), 592 ); 593 }) 594 ); 595 596[@genType] 597let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 598 curry(source => 599 curry(sink => { 600 let acc = ref(seed); 601 602 source((. signal) => 603 sink(. 604 switch (signal) { 605 | Push(x) => 606 acc := f(. acc^, x); 607 Push(acc^); 608 | Start(x) => Start(x) 609 | End => End 610 }, 611 ) 612 ); 613 }) 614 ); 615 616type shareStateT('a) = { 617 mutable sinks: Rebel.Array.t(sinkT('a)), 618 mutable talkback: (. talkbackT) => unit, 619 mutable gotSignal: bool, 620}; 621 622[@genType] 623let share = (source: sourceT('a)): sourceT('a) => { 624 let state = { 625 sinks: Rebel.Array.makeEmpty(), 626 talkback: talkbackPlaceholder, 627 gotSignal: false, 628 }; 629 630 sink => { 631 state.sinks = Rebel.Array.append(state.sinks, sink); 632 633 if (Rebel.Array.size(state.sinks) === 1) { 634 source((. signal) => 635 switch (signal) { 636 | Push(_) => 637 state.gotSignal = false; 638 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 639 | Start(x) => state.talkback = x 640 | End => 641 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 642 state.sinks = Rebel.Array.makeEmpty(); 643 } 644 ); 645 }; 646 647 sink(. 648 Start( 649 (. signal) => 650 switch (signal) { 651 | Close => 652 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 653 if (Rebel.Array.size(state.sinks) === 0) { 654 state.talkback(. Close); 655 }; 656 | Pull when !state.gotSignal => 657 state.gotSignal = true; 658 state.talkback(. signal); 659 | Pull => () 660 }, 661 ), 662 ); 663 }; 664}; 665 666type skipStateT = { 667 mutable talkback: (. talkbackT) => unit, 668 mutable rest: int, 669}; 670 671[@genType] 672let skip = (wait: int): operatorT('a, 'a) => 673 curry(source => 674 curry(sink => { 675 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 676 677 source((. signal) => 678 switch (signal) { 679 | Start(tb) => 680 state.talkback = tb; 681 sink(. signal); 682 | Push(_) when state.rest > 0 => 683 state.rest = state.rest - 1; 684 state.talkback(. Pull); 685 | _ => sink(. signal) 686 } 687 ); 688 }) 689 ); 690 691type skipUntilStateT = { 692 mutable sourceTalkback: (. talkbackT) => unit, 693 mutable notifierTalkback: (. talkbackT) => unit, 694 mutable skip: bool, 695 mutable pulled: bool, 696 mutable ended: bool, 697}; 698 699[@genType] 700let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 701 curry(source => 702 curry(sink => { 703 let state: skipUntilStateT = { 704 sourceTalkback: talkbackPlaceholder, 705 notifierTalkback: talkbackPlaceholder, 706 skip: true, 707 pulled: false, 708 ended: false, 709 }; 710 711 source((. signal) => 712 switch (signal) { 713 | Start(tb) => 714 state.sourceTalkback = tb; 715 716 notifier((. signal) => 717 switch (signal) { 718 | Start(innerTb) => 719 state.notifierTalkback = innerTb; 720 innerTb(. Pull); 721 | Push(_) => 722 state.skip = false; 723 state.notifierTalkback(. Close); 724 | End when state.skip => 725 state.ended = true; 726 state.sourceTalkback(. Close); 727 | End => () 728 } 729 ); 730 | Push(_) when !state.skip && !state.ended => 731 state.pulled = false; 732 sink(. signal); 733 | Push(_) when !state.pulled => 734 state.pulled = true; 735 state.sourceTalkback(. Pull); 736 state.notifierTalkback(. Pull); 737 | Push(_) => state.pulled = false 738 | End => 739 if (state.skip) { 740 state.notifierTalkback(. Close); 741 }; 742 state.ended = true; 743 sink(. End); 744 } 745 ); 746 747 sink(. 748 Start( 749 (. signal) => 750 if (!state.ended) { 751 switch (signal) { 752 | Close => 753 state.ended = true; 754 state.sourceTalkback(. Close); 755 if (state.skip) { 756 state.notifierTalkback(. Close); 757 }; 758 | Pull when !state.pulled => 759 state.pulled = true; 760 if (state.skip) { 761 state.notifierTalkback(. Pull); 762 }; 763 state.sourceTalkback(. Pull); 764 | Pull => () 765 }; 766 }, 767 ), 768 ); 769 }) 770 ); 771 772type skipWhileStateT = { 773 mutable talkback: (. talkbackT) => unit, 774 mutable skip: bool, 775}; 776 777[@genType] 778let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 779 curry(source => 780 curry(sink => { 781 let state: skipWhileStateT = { 782 talkback: talkbackPlaceholder, 783 skip: true, 784 }; 785 786 source((. signal) => 787 switch (signal) { 788 | Start(tb) => 789 state.talkback = tb; 790 sink(. signal); 791 | Push(x) when state.skip => 792 if (f(. x)) { 793 state.talkback(. Pull); 794 } else { 795 state.skip = false; 796 sink(. signal); 797 } 798 | _ => sink(. signal) 799 } 800 ); 801 }) 802 ); 803 804type switchMapStateT('a) = { 805 mutable outerTalkback: (. talkbackT) => unit, 806 mutable outerPulled: bool, 807 mutable innerTalkback: (. talkbackT) => unit, 808 mutable innerActive: bool, 809 mutable innerPulled: bool, 810 mutable ended: bool, 811}; 812 813[@genType] 814let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 815 curry(source => 816 curry(sink => { 817 let state: switchMapStateT('a) = { 818 outerTalkback: talkbackPlaceholder, 819 outerPulled: false, 820 innerTalkback: talkbackPlaceholder, 821 innerActive: false, 822 innerPulled: false, 823 ended: false, 824 }; 825 826 let applyInnerSource = innerSource => { 827 state.innerActive = true; 828 innerSource((. signal) => 829 if (state.innerActive) { 830 switch (signal) { 831 | Start(tb) => 832 state.innerTalkback = tb; 833 state.innerPulled = false; 834 tb(. Pull); 835 | Push(_) => 836 sink(. signal); 837 if (!state.innerPulled) { 838 state.innerTalkback(. Pull); 839 } else { 840 state.innerPulled = false; 841 }; 842 | End => 843 state.innerActive = false; 844 if (state.ended) { 845 sink(. signal); 846 } else if (!state.outerPulled) { 847 state.outerPulled = true; 848 state.outerTalkback(. Pull); 849 }; 850 }; 851 } 852 ); 853 }; 854 855 source((. signal) => 856 switch (signal) { 857 | Start(tb) => state.outerTalkback = tb 858 | Push(x) when !state.ended => 859 if (state.innerActive) { 860 state.innerTalkback(. Close); 861 state.innerTalkback = talkbackPlaceholder; 862 }; 863 864 if (!state.outerPulled) { 865 state.outerPulled = true; 866 state.outerTalkback(. Pull); 867 } else { 868 state.outerPulled = false; 869 }; 870 871 applyInnerSource(f(. x)); 872 | Push(_) => () 873 | End when !state.ended => 874 state.ended = true; 875 if (!state.innerActive) { 876 sink(. End); 877 }; 878 | End => () 879 } 880 ); 881 882 sink(. 883 Start( 884 (. signal) => 885 switch (signal) { 886 | Pull => 887 if (!state.ended && !state.outerPulled) { 888 state.outerPulled = true; 889 state.outerTalkback(. Pull); 890 }; 891 if (state.innerActive && !state.innerPulled) { 892 state.innerPulled = true; 893 state.innerTalkback(. Pull); 894 }; 895 | Close => 896 if (!state.ended) { 897 state.ended = true; 898 state.outerTalkback(. Close); 899 }; 900 if (state.innerActive) { 901 state.innerActive = false; 902 state.innerTalkback(. Close); 903 }; 904 }, 905 ), 906 ); 907 }) 908 ); 909 910[@genType] 911let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 912 switchMap((. x) => x, source); 913 914type takeStateT = { 915 mutable ended: bool, 916 mutable taken: int, 917 mutable talkback: (. talkbackT) => unit, 918}; 919 920[@genType] 921let take = (max: int): operatorT('a, 'a) => 922 curry(source => 923 curry(sink => { 924 let state: takeStateT = { 925 ended: false, 926 taken: 0, 927 talkback: talkbackPlaceholder, 928 }; 929 930 source((. signal) => 931 switch (signal) { 932 | Start(tb) when max <= 0 => 933 state.ended = true; 934 sink(. End); 935 tb(. Close); 936 | Start(tb) => state.talkback = tb 937 | Push(_) when state.taken < max && !state.ended => 938 state.taken = state.taken + 1; 939 sink(. signal); 940 if (!state.ended && state.taken >= max) { 941 state.ended = true; 942 sink(. End); 943 state.talkback(. Close); 944 }; 945 | Push(_) => () 946 | End when !state.ended => 947 state.ended = true; 948 sink(. End); 949 | End => () 950 } 951 ); 952 953 sink(. 954 Start( 955 (. signal) => 956 if (!state.ended) { 957 switch (signal) { 958 | Pull when state.taken < max => state.talkback(. Pull) 959 | Pull => () 960 | Close => 961 state.ended = true; 962 state.talkback(. Close); 963 }; 964 }, 965 ), 966 ); 967 }) 968 ); 969 970type takeLastStateT('a) = { 971 mutable queue: Rebel.MutableQueue.t('a), 972 mutable talkback: (. talkbackT) => unit, 973}; 974 975[@genType] 976let takeLast = (max: int): operatorT('a, 'a) => 977 curry(source => 978 curry(sink => { 979 let state: takeLastStateT('a) = { 980 queue: Rebel.MutableQueue.make(), 981 talkback: talkbackPlaceholder, 982 }; 983 984 source((. signal) => 985 switch (signal) { 986 | Start(talkback) when max <= 0 => 987 talkback(. Close); 988 Wonka_sources.empty(sink); 989 | Start(talkback) => 990 state.talkback = talkback; 991 talkback(. Pull); 992 | Push(x) => 993 let size = Rebel.MutableQueue.size(state.queue); 994 if (size >= max && max > 0) { 995 ignore(Rebel.MutableQueue.pop(state.queue)); 996 }; 997 998 Rebel.MutableQueue.add(state.queue, x); 999 state.talkback(. Pull); 1000 | End => 1001 Wonka_sources.fromArray( 1002 Rebel.MutableQueue.toArray(state.queue), 1003 sink, 1004 ) 1005 } 1006 ); 1007 }) 1008 ); 1009 1010type takeUntilStateT = { 1011 mutable ended: bool, 1012 mutable sourceTalkback: (. talkbackT) => unit, 1013 mutable notifierTalkback: (. talkbackT) => unit, 1014}; 1015 1016[@genType] 1017let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 1018 curry(source => 1019 curry(sink => { 1020 let state: takeUntilStateT = { 1021 ended: false, 1022 sourceTalkback: talkbackPlaceholder, 1023 notifierTalkback: talkbackPlaceholder, 1024 }; 1025 1026 source((. signal) => 1027 switch (signal) { 1028 | Start(tb) => 1029 state.sourceTalkback = tb; 1030 1031 notifier((. signal) => 1032 switch (signal) { 1033 | Start(innerTb) => 1034 state.notifierTalkback = innerTb; 1035 innerTb(. Pull); 1036 | Push(_) => 1037 state.ended = true; 1038 state.sourceTalkback(. Close); 1039 sink(. End); 1040 | End => () 1041 } 1042 ); 1043 | End when !state.ended => 1044 state.ended = true; 1045 state.notifierTalkback(. Close); 1046 sink(. End); 1047 | End => () 1048 | Push(_) when !state.ended => sink(. signal) 1049 | Push(_) => () 1050 } 1051 ); 1052 1053 sink(. 1054 Start( 1055 (. signal) => 1056 if (!state.ended) { 1057 switch (signal) { 1058 | Close => 1059 state.ended = true; 1060 state.sourceTalkback(. Close); 1061 state.notifierTalkback(. Close); 1062 | Pull => state.sourceTalkback(. Pull) 1063 }; 1064 }, 1065 ), 1066 ); 1067 }) 1068 ); 1069 1070type takeWhileStateT = { 1071 mutable talkback: (. talkbackT) => unit, 1072 mutable ended: bool, 1073}; 1074 1075[@genType] 1076let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1077 curry(source => 1078 curry(sink => { 1079 let state: takeWhileStateT = { 1080 talkback: talkbackPlaceholder, 1081 ended: false, 1082 }; 1083 1084 source((. signal) => 1085 switch (signal) { 1086 | Start(tb) => 1087 state.talkback = tb; 1088 sink(. signal); 1089 | End when !state.ended => 1090 state.ended = true; 1091 sink(. End); 1092 | End => () 1093 | Push(x) when !state.ended => 1094 if (!f(. x)) { 1095 state.ended = true; 1096 sink(. End); 1097 state.talkback(. Close); 1098 } else { 1099 sink(. signal); 1100 } 1101 | Push(_) => () 1102 } 1103 ); 1104 }) 1105 );