Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.14 29 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => { 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => { 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 }; 48 (); 49 }); 50 | Push(value) when !state.ended => 51 Rebel.MutableQueue.add(state.buffer, value); 52 if (!state.pulled) { 53 state.pulled = true; 54 state.sourceTalkback(. Pull); 55 state.notifierTalkback(. Pull); 56 } else { 57 state.pulled = false; 58 }; 59 | Push(_) => () 60 | End when !state.ended => 61 state.ended = true; 62 state.notifierTalkback(. Close); 63 if (Rebel.MutableQueue.size(state.buffer) > 0) { 64 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 65 }; 66 sink(. End); 67 | End => () 68 }; 69 (); 70 }); 71 72 sink(. 73 Start( 74 (. signal) => 75 if (!state.ended) { 76 switch (signal) { 77 | Close => 78 state.ended = true; 79 state.sourceTalkback(. Close); 80 state.notifierTalkback(. Close); 81 | Pull when !state.pulled => 82 state.pulled = true; 83 state.sourceTalkback(. Pull); 84 state.notifierTalkback(. Pull); 85 | Pull => () 86 }; 87 }, 88 ), 89 ); 90 }) 91 ); 92 93type combineStateT('a, 'b) = { 94 mutable talkbackA: (. talkbackT) => unit, 95 mutable talkbackB: (. talkbackT) => unit, 96 mutable lastValA: option('a), 97 mutable lastValB: option('b), 98 mutable gotSignal: bool, 99 mutable endCounter: int, 100 mutable ended: bool, 101}; 102 103[@genType] 104let combine = 105 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 106 curry(sink => { 107 let state = { 108 talkbackA: talkbackPlaceholder, 109 talkbackB: talkbackPlaceholder, 110 lastValA: None, 111 lastValB: None, 112 gotSignal: false, 113 endCounter: 0, 114 ended: false, 115 }; 116 117 sourceA((. signal) => 118 switch (signal, state.lastValB) { 119 | (Start(tb), _) => state.talkbackA = tb 120 | (Push(a), None) => 121 state.lastValA = Some(a); 122 if (!state.gotSignal) { 123 state.talkbackB(. Pull); 124 } else { 125 state.gotSignal = false; 126 }; 127 | (Push(a), Some(b)) when !state.ended => 128 state.lastValA = Some(a); 129 state.gotSignal = false; 130 sink(. Push((a, b))); 131 | (End, _) when state.endCounter < 1 => 132 state.endCounter = state.endCounter + 1 133 | (End, _) when !state.ended => 134 state.ended = true; 135 sink(. End); 136 | _ => () 137 } 138 ); 139 140 sourceB((. signal) => 141 switch (signal, state.lastValA) { 142 | (Start(tb), _) => state.talkbackB = tb 143 | (Push(b), None) => 144 state.lastValB = Some(b); 145 if (!state.gotSignal) { 146 state.talkbackA(. Pull); 147 } else { 148 state.gotSignal = false; 149 }; 150 | (Push(b), Some(a)) when !state.ended => 151 state.lastValB = Some(b); 152 state.gotSignal = false; 153 sink(. Push((a, b))); 154 | (End, _) when state.endCounter < 1 => 155 state.endCounter = state.endCounter + 1 156 | (End, _) when !state.ended => 157 state.ended = true; 158 sink(. End); 159 | _ => () 160 } 161 ); 162 163 sink(. 164 Start( 165 (. signal) => 166 if (!state.ended) { 167 switch (signal) { 168 | Close => 169 state.ended = true; 170 state.talkbackA(. Close); 171 state.talkbackB(. Close); 172 | Pull when !state.gotSignal => 173 state.gotSignal = true; 174 state.talkbackA(. signal); 175 state.talkbackB(. signal); 176 | Pull => () 177 }; 178 }, 179 ), 180 ); 181 }); 182 183type concatMapStateT('a) = { 184 inputQueue: Rebel.MutableQueue.t('a), 185 mutable outerTalkback: (. talkbackT) => unit, 186 mutable outerPulled: bool, 187 mutable innerTalkback: (. talkbackT) => unit, 188 mutable innerActive: bool, 189 mutable innerPulled: bool, 190 mutable ended: bool, 191}; 192 193[@genType] 194let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 195 curry(source => 196 curry(sink => { 197 let state: concatMapStateT('a) = { 198 inputQueue: Rebel.MutableQueue.make(), 199 outerTalkback: talkbackPlaceholder, 200 outerPulled: false, 201 innerTalkback: talkbackPlaceholder, 202 innerActive: false, 203 innerPulled: false, 204 ended: false, 205 }; 206 207 let rec applyInnerSource = innerSource => { 208 state.innerActive = true; 209 innerSource((. signal) => { 210 switch (signal) { 211 | Start(tb) => 212 state.innerTalkback = tb; 213 state.innerPulled = false; 214 tb(. Pull); 215 | Push(_) when state.innerActive => 216 sink(. signal); 217 if (!state.innerPulled) { 218 state.innerTalkback(. Pull); 219 } else { 220 state.innerPulled = false; 221 }; 222 | Push(_) => () 223 | End when state.innerActive => 224 state.innerActive = false; 225 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 226 | Some(input) => applyInnerSource(f(. input)) 227 | None when state.ended => sink(. End) 228 | None when !state.outerPulled => 229 state.outerPulled = true; 230 state.outerTalkback(. Pull); 231 | None => () 232 }; 233 | End => () 234 }; 235 (); 236 }); 237 (); 238 }; 239 240 source((. signal) => { 241 switch (signal) { 242 | Start(tb) => state.outerTalkback = tb 243 | Push(x) when !state.ended => 244 state.outerPulled = false; 245 if (state.innerActive) { 246 Rebel.MutableQueue.add(state.inputQueue, x); 247 } else { 248 applyInnerSource(f(. x)); 249 }; 250 | Push(_) => () 251 | End when !state.ended => 252 state.ended = true; 253 if (!state.innerActive 254 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 255 sink(. End); 256 }; 257 | End => () 258 }; 259 (); 260 }); 261 262 sink(. 263 Start( 264 (. signal) => 265 switch (signal) { 266 | Pull => 267 if (!state.ended && !state.outerPulled) { 268 state.outerPulled = true; 269 state.outerTalkback(. Pull); 270 }; 271 if (state.innerActive && !state.innerPulled) { 272 state.innerPulled = true; 273 state.innerTalkback(. Pull); 274 }; 275 | Close => 276 if (!state.ended) { 277 state.ended = true; 278 state.outerTalkback(. Close); 279 }; 280 if (state.innerActive) { 281 state.innerActive = false; 282 state.innerTalkback(. Close); 283 }; 284 }, 285 ), 286 ); 287 }) 288 ); 289 290[@genType] 291let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 292 concatMap((. x) => x, source); 293 294[@genType] 295let concat = (sources: array(sourceT('a))): sourceT('a) => 296 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 297 298[@genType] 299let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 300 curry(source => 301 curry(sink => { 302 let talkback = ref(talkbackPlaceholder); 303 304 source((. signal) => { 305 switch (signal) { 306 | Start(tb) => 307 talkback := tb; 308 sink(. signal); 309 | Push(x) when !f(. x) => talkback^(. Pull) 310 | _ => sink(. signal) 311 }; 312 (); 313 }); 314 }) 315 ); 316 317[@genType] 318let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 319 curry(source => 320 curry(sink => 321 source((. signal) => { 322 sink(. 323 /* The signal needs to be recreated for genType to generate 324 the correct generics during codegen */ 325 switch (signal) { 326 | Start(x) => Start(x) 327 | Push(x) => Push(f(. x)) 328 | End => End 329 }, 330 ) 331 }) 332 ) 333 ); 334 335type mergeMapStateT = { 336 mutable outerTalkback: (. talkbackT) => unit, 337 mutable outerPulled: bool, 338 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 339 mutable ended: bool, 340}; 341 342[@genType] 343let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 344 curry(source => 345 curry(sink => { 346 let state: mergeMapStateT = { 347 outerTalkback: talkbackPlaceholder, 348 outerPulled: false, 349 innerTalkbacks: Rebel.Array.makeEmpty(), 350 ended: false, 351 }; 352 353 let applyInnerSource = innerSource => { 354 let talkback = ref(talkbackPlaceholder); 355 356 innerSource((. signal) => 357 switch (signal) { 358 | Start(tb) => 359 talkback := tb; 360 state.innerTalkbacks = 361 Rebel.Array.append(state.innerTalkbacks, tb); 362 tb(. Pull); 363 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 364 sink(. Push(x)); 365 talkback^(. Pull); 366 | Push(_) => () 367 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 368 state.innerTalkbacks = 369 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 370 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; 371 if (state.ended && exhausted) { 372 sink(. End); 373 } else if (!state.outerPulled && exhausted) { 374 state.outerPulled = true; 375 state.outerTalkback(. Pull); 376 }; 377 | End => () 378 } 379 ); 380 }; 381 382 source((. signal) => 383 switch (signal) { 384 | Start(tb) => state.outerTalkback = tb 385 | Push(x) when !state.ended => 386 state.outerPulled = false; 387 applyInnerSource(f(. x)); 388 if (!state.outerPulled) { 389 state.outerPulled = true; 390 state.outerTalkback(. Pull); 391 }; 392 | Push(_) => () 393 | End when !state.ended => 394 state.ended = true; 395 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 396 sink(. End); 397 }; 398 | End => () 399 } 400 ); 401 402 sink(. 403 Start( 404 (. signal) => 405 switch (signal) { 406 | Close => 407 if (!state.ended) { 408 state.ended = true; 409 state.outerTalkback(. signal); 410 }; 411 412 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); 413 state.innerTalkbacks = Rebel.Array.makeEmpty(); 414 | Pull => 415 if (!state.outerPulled && !state.ended) { 416 state.outerPulled = true; 417 state.outerTalkback(. Pull); 418 } else { 419 state.outerPulled = false; 420 }; 421 422 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 423 }, 424 ), 425 ); 426 }) 427 ); 428 429[@genType] 430let merge = (sources: array(sourceT('a))): sourceT('a) => 431 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 432 433[@genType] 434let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 435 mergeMap((. x) => x, source); 436 437[@genType] 438let flatten = mergeAll; 439 440[@genType] 441let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 442 curry(source => 443 curry(sink => { 444 let ended = ref(false); 445 source((. signal) => 446 switch (signal) { 447 | Start(talkback) => 448 sink(. 449 Start( 450 (. signal) => 451 if (! ended^) { 452 switch (signal) { 453 | Pull => talkback(. signal) 454 | Close => 455 ended := true; 456 talkback(. signal); 457 f(.); 458 }; 459 }, 460 ), 461 ) 462 | Push(_) when ! ended^ => sink(. signal) 463 | Push(_) => () 464 | End when ! ended^ => 465 ended := true; 466 sink(. signal); 467 f(.); 468 | End => () 469 } 470 ); 471 }) 472 ); 473 474[@genType] 475let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 476 curry(source => 477 curry(sink => { 478 let ended = ref(false); 479 source((. signal) => { 480 switch (signal) { 481 | Start(talkback) => 482 sink(. 483 Start( 484 (. signal) => 485 if (! ended^) { 486 switch (signal) { 487 | Pull => talkback(. signal) 488 | Close => 489 ended := true; 490 talkback(. signal); 491 }; 492 }, 493 ), 494 ) 495 | Push(x) when ! ended^ => 496 f(. x); 497 sink(. signal); 498 | Push(_) => () 499 | End when ! ended^ => 500 ended := true; 501 sink(. signal); 502 | End => () 503 }; 504 (); 505 }); 506 }) 507 ); 508 509[@genType] 510let tap = onPush; 511 512[@genType] 513let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 514 curry(source => 515 curry(sink => 516 source((. signal) => 517 switch (signal) { 518 | Start(_) => 519 sink(. signal); 520 f(.); 521 | _ => sink(. signal) 522 } 523 ) 524 ) 525 ); 526 527type sampleStateT('a) = { 528 mutable sourceTalkback: (. talkbackT) => unit, 529 mutable notifierTalkback: (. talkbackT) => unit, 530 mutable value: option('a), 531 mutable pulled: bool, 532 mutable ended: bool, 533}; 534 535[@genType] 536let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 537 curry(source => 538 curry(sink => { 539 let state = { 540 sourceTalkback: talkbackPlaceholder, 541 notifierTalkback: talkbackPlaceholder, 542 value: None, 543 pulled: false, 544 ended: false, 545 }; 546 547 source((. signal) => 548 switch (signal) { 549 | Start(tb) => state.sourceTalkback = tb 550 | Push(x) => 551 state.value = Some(x); 552 if (!state.pulled) { 553 state.pulled = true; 554 state.notifierTalkback(. Pull); 555 state.sourceTalkback(. Pull); 556 } else { 557 state.pulled = false; 558 }; 559 | End when !state.ended => 560 state.ended = true; 561 state.notifierTalkback(. Close); 562 sink(. End); 563 | End => () 564 } 565 ); 566 567 notifier((. signal) => 568 switch (signal, state.value) { 569 | (Start(tb), _) => state.notifierTalkback = tb 570 | (End, _) when !state.ended => 571 state.ended = true; 572 state.sourceTalkback(. Close); 573 sink(. End); 574 | (End, _) => () 575 | (Push(_), Some(x)) when !state.ended => 576 state.value = None; 577 sink(. Push(x)); 578 | (Push(_), _) => () 579 } 580 ); 581 582 sink(. 583 Start( 584 (. signal) => 585 if (!state.ended) { 586 switch (signal) { 587 | Pull when !state.pulled => 588 state.pulled = true; 589 state.sourceTalkback(. Pull); 590 state.notifierTalkback(. Pull); 591 | Pull => () 592 | Close => 593 state.ended = true; 594 state.sourceTalkback(. Close); 595 state.notifierTalkback(. Close); 596 }; 597 }, 598 ), 599 ); 600 }) 601 ); 602 603[@genType] 604let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 605 curry(source => 606 curry(sink => { 607 let acc = ref(seed); 608 609 source((. signal) => 610 sink(. 611 switch (signal) { 612 | Push(x) => 613 acc := f(. acc^, x); 614 Push(acc^); 615 | Start(x) => Start(x) 616 | End => End 617 }, 618 ) 619 ); 620 }) 621 ); 622 623type shareStateT('a) = { 624 mutable sinks: Rebel.Array.t(sinkT('a)), 625 mutable talkback: (. talkbackT) => unit, 626 mutable gotSignal: bool, 627}; 628 629[@genType] 630let share = (source: sourceT('a)): sourceT('a) => { 631 let state = { 632 sinks: Rebel.Array.makeEmpty(), 633 talkback: talkbackPlaceholder, 634 gotSignal: false, 635 }; 636 637 sink => { 638 state.sinks = Rebel.Array.append(state.sinks, sink); 639 640 if (Rebel.Array.size(state.sinks) === 1) { 641 source((. signal) => 642 switch (signal) { 643 | Push(_) => 644 state.gotSignal = false; 645 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 646 | Start(x) => state.talkback = x 647 | End => 648 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 649 state.sinks = Rebel.Array.makeEmpty(); 650 } 651 ); 652 }; 653 654 sink(. 655 Start( 656 (. signal) => 657 switch (signal) { 658 | Close => 659 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 660 if (Rebel.Array.size(state.sinks) === 0) { 661 state.talkback(. Close); 662 }; 663 | Pull when !state.gotSignal => 664 state.gotSignal = true; 665 state.talkback(. signal); 666 | Pull => () 667 }, 668 ), 669 ); 670 }; 671}; 672 673type skipStateT = { 674 mutable talkback: (. talkbackT) => unit, 675 mutable rest: int, 676}; 677 678[@genType] 679let skip = (wait: int): operatorT('a, 'a) => 680 curry(source => 681 curry(sink => { 682 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 683 684 source((. signal) => 685 switch (signal) { 686 | Start(tb) => 687 state.talkback = tb; 688 sink(. signal); 689 | Push(_) when state.rest > 0 => 690 state.rest = state.rest - 1; 691 state.talkback(. Pull); 692 | _ => sink(. signal) 693 } 694 ); 695 }) 696 ); 697 698type skipUntilStateT = { 699 mutable sourceTalkback: (. talkbackT) => unit, 700 mutable notifierTalkback: (. talkbackT) => unit, 701 mutable skip: bool, 702 mutable pulled: bool, 703 mutable ended: bool, 704}; 705 706[@genType] 707let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 708 curry(source => 709 curry(sink => { 710 let state: skipUntilStateT = { 711 sourceTalkback: talkbackPlaceholder, 712 notifierTalkback: talkbackPlaceholder, 713 skip: true, 714 pulled: false, 715 ended: false, 716 }; 717 718 source((. signal) => { 719 switch (signal) { 720 | Start(tb) => 721 state.sourceTalkback = tb; 722 723 notifier((. signal) => { 724 switch (signal) { 725 | Start(innerTb) => 726 state.notifierTalkback = innerTb; 727 innerTb(. Pull); 728 | Push(_) => 729 state.skip = false; 730 state.notifierTalkback(. Close); 731 | End when state.skip => 732 state.ended = true; 733 state.sourceTalkback(. Close); 734 | End => () 735 }; 736 (); 737 }); 738 | Push(_) when !state.skip && !state.ended => 739 state.pulled = false; 740 sink(. signal); 741 | Push(_) when !state.pulled => 742 state.pulled = true; 743 state.sourceTalkback(. Pull); 744 state.notifierTalkback(. Pull); 745 | Push(_) => state.pulled = false 746 | End => 747 if (state.skip) { 748 state.notifierTalkback(. Close); 749 }; 750 state.ended = true; 751 sink(. End); 752 }; 753 (); 754 }); 755 756 sink(. 757 Start( 758 (. signal) => 759 if (!state.ended) { 760 switch (signal) { 761 | Close => 762 state.ended = true; 763 state.sourceTalkback(. Close); 764 if (state.skip) { 765 state.notifierTalkback(. Close); 766 }; 767 | Pull when !state.pulled => 768 state.pulled = true; 769 if (state.skip) { 770 state.notifierTalkback(. Pull); 771 }; 772 state.sourceTalkback(. Pull); 773 | Pull => () 774 }; 775 }, 776 ), 777 ); 778 }) 779 ); 780 781type skipWhileStateT = { 782 mutable talkback: (. talkbackT) => unit, 783 mutable skip: bool, 784}; 785 786[@genType] 787let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 788 curry(source => 789 curry(sink => { 790 let state: skipWhileStateT = { 791 talkback: talkbackPlaceholder, 792 skip: true, 793 }; 794 795 source((. signal) => 796 switch (signal) { 797 | Start(tb) => 798 state.talkback = tb; 799 sink(. signal); 800 | Push(x) when state.skip => 801 if (f(. x)) { 802 state.talkback(. Pull); 803 } else { 804 state.skip = false; 805 sink(. signal); 806 } 807 | _ => sink(. signal) 808 } 809 ); 810 }) 811 ); 812 813type switchMapStateT('a) = { 814 mutable outerTalkback: (. talkbackT) => unit, 815 mutable outerPulled: bool, 816 mutable innerTalkback: (. talkbackT) => unit, 817 mutable innerActive: bool, 818 mutable innerPulled: bool, 819 mutable ended: bool, 820}; 821 822[@genType] 823let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 824 curry(source => 825 curry(sink => { 826 let state: switchMapStateT('a) = { 827 outerTalkback: talkbackPlaceholder, 828 outerPulled: false, 829 innerTalkback: talkbackPlaceholder, 830 innerActive: false, 831 innerPulled: false, 832 ended: false, 833 }; 834 835 let applyInnerSource = innerSource => { 836 state.innerActive = true; 837 innerSource((. signal) => 838 if (state.innerActive) { 839 switch (signal) { 840 | Start(tb) => 841 state.innerTalkback = tb; 842 state.innerPulled = false; 843 tb(. Pull); 844 | Push(_) => 845 sink(. signal); 846 if (!state.innerPulled) { 847 state.innerTalkback(. Pull); 848 } else { 849 state.innerPulled = false; 850 }; 851 | End => 852 state.innerActive = false; 853 if (state.ended) { 854 sink(. signal); 855 } else if (!state.outerPulled) { 856 state.outerPulled = true; 857 state.outerTalkback(. Pull); 858 }; 859 }; 860 } 861 ); 862 (); 863 }; 864 865 source((. signal) => { 866 switch (signal) { 867 | Start(tb) => state.outerTalkback = tb 868 | Push(x) when !state.ended => 869 if (state.innerActive) { 870 state.innerTalkback(. Close); 871 state.innerTalkback = talkbackPlaceholder; 872 }; 873 874 if (!state.outerPulled) { 875 state.outerPulled = true; 876 state.outerTalkback(. Pull); 877 } else { 878 state.outerPulled = false; 879 }; 880 881 applyInnerSource(f(. x)); 882 | Push(_) => () 883 | End when !state.ended => 884 state.ended = true; 885 if (!state.innerActive) { 886 sink(. End); 887 }; 888 | End => () 889 }; 890 (); 891 }); 892 893 sink(. 894 Start( 895 (. signal) => 896 switch (signal) { 897 | Pull => 898 if (!state.ended && !state.outerPulled) { 899 state.outerPulled = true; 900 state.outerTalkback(. Pull); 901 }; 902 if (state.innerActive && !state.innerPulled) { 903 state.innerPulled = true; 904 state.innerTalkback(. Pull); 905 }; 906 | Close => 907 if (!state.ended) { 908 state.ended = true; 909 state.outerTalkback(. Close); 910 }; 911 if (state.innerActive) { 912 state.innerActive = false; 913 state.innerTalkback(. Close); 914 }; 915 }, 916 ), 917 ); 918 }) 919 ); 920 921[@genType] 922let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 923 switchMap((. x) => x, source); 924 925type takeStateT = { 926 mutable ended: bool, 927 mutable taken: int, 928 mutable talkback: (. talkbackT) => unit, 929}; 930 931[@genType] 932let take = (max: int): operatorT('a, 'a) => 933 curry(source => 934 curry(sink => { 935 let state: takeStateT = { 936 ended: false, 937 taken: 0, 938 talkback: talkbackPlaceholder, 939 }; 940 941 source((. signal) => 942 switch (signal) { 943 | Start(tb) when max <= 0 => 944 state.ended = true; 945 sink(. End); 946 tb(. Close); 947 | Start(tb) => state.talkback = tb 948 | Push(_) when state.taken < max && !state.ended => 949 state.taken = state.taken + 1; 950 sink(. signal); 951 if (!state.ended && state.taken >= max) { 952 state.ended = true; 953 sink(. End); 954 state.talkback(. Close); 955 }; 956 | Push(_) => () 957 | End when !state.ended => 958 state.ended = true; 959 sink(. End); 960 | End => () 961 } 962 ); 963 964 sink(. 965 Start( 966 (. signal) => 967 if (!state.ended) { 968 switch (signal) { 969 | Pull when state.taken < max => state.talkback(. Pull) 970 | Pull => () 971 | Close => 972 state.ended = true; 973 state.talkback(. Close); 974 }; 975 }, 976 ), 977 ); 978 }) 979 ); 980 981type takeLastStateT('a) = { 982 mutable queue: Rebel.MutableQueue.t('a), 983 mutable talkback: (. talkbackT) => unit, 984}; 985 986[@genType] 987let takeLast = (max: int): operatorT('a, 'a) => 988 curry(source => 989 curry(sink => { 990 let state: takeLastStateT('a) = { 991 queue: Rebel.MutableQueue.make(), 992 talkback: talkbackPlaceholder, 993 }; 994 995 source((. signal) => { 996 switch (signal) { 997 | Start(talkback) when max <= 0 => 998 talkback(. Close); 999 Wonka_sources.empty(sink); 1000 | Start(talkback) => 1001 state.talkback = talkback; 1002 talkback(. Pull); 1003 | Push(x) => 1004 let size = Rebel.MutableQueue.size(state.queue); 1005 if (size >= max && max > 0) { 1006 ignore(Rebel.MutableQueue.pop(state.queue)); 1007 }; 1008 1009 Rebel.MutableQueue.add(state.queue, x); 1010 state.talkback(. Pull); 1011 | End => 1012 Wonka_sources.fromArray( 1013 Rebel.MutableQueue.toArray(state.queue), 1014 sink, 1015 ) 1016 }; 1017 (); 1018 }); 1019 }) 1020 ); 1021 1022type takeUntilStateT = { 1023 mutable ended: bool, 1024 mutable sourceTalkback: (. talkbackT) => unit, 1025 mutable notifierTalkback: (. talkbackT) => unit, 1026}; 1027 1028[@genType] 1029let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 1030 curry(source => 1031 curry(sink => { 1032 let state: takeUntilStateT = { 1033 ended: false, 1034 sourceTalkback: talkbackPlaceholder, 1035 notifierTalkback: talkbackPlaceholder, 1036 }; 1037 1038 source((. signal) => { 1039 switch (signal) { 1040 | Start(tb) => 1041 state.sourceTalkback = tb; 1042 1043 notifier((. signal) => { 1044 switch (signal) { 1045 | Start(innerTb) => 1046 state.notifierTalkback = innerTb; 1047 innerTb(. Pull); 1048 | Push(_) => 1049 state.ended = true; 1050 state.sourceTalkback(. Close); 1051 sink(. End); 1052 | End => () 1053 }; 1054 (); 1055 }); 1056 | End when !state.ended => 1057 state.ended = true; 1058 state.notifierTalkback(. Close); 1059 sink(. End); 1060 | End => () 1061 | Push(_) when !state.ended => sink(. signal) 1062 | Push(_) => () 1063 }; 1064 (); 1065 }); 1066 1067 sink(. 1068 Start( 1069 (. signal) => 1070 if (!state.ended) { 1071 switch (signal) { 1072 | Close => 1073 state.ended = true; 1074 state.sourceTalkback(. Close); 1075 state.notifierTalkback(. Close); 1076 | Pull => state.sourceTalkback(. Pull) 1077 }; 1078 }, 1079 ), 1080 ); 1081 }) 1082 ); 1083 1084type takeWhileStateT = { 1085 mutable talkback: (. talkbackT) => unit, 1086 mutable ended: bool, 1087}; 1088 1089[@genType] 1090let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1091 curry(source => 1092 curry(sink => { 1093 let state: takeWhileStateT = { 1094 talkback: talkbackPlaceholder, 1095 ended: false, 1096 }; 1097 1098 source((. signal) => 1099 switch (signal) { 1100 | Start(tb) => 1101 state.talkback = tb; 1102 sink(. signal); 1103 | End when !state.ended => 1104 state.ended = true; 1105 sink(. End); 1106 | End => () 1107 | Push(x) when !state.ended => 1108 if (!f(. x)) { 1109 state.ended = true; 1110 sink(. End); 1111 state.talkback(. Close); 1112 } else { 1113 sink(. signal); 1114 } 1115 | Push(_) => () 1116 } 1117 ); 1118 }) 1119 );