Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.5 29 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type bufferStateT('a) = { 5 mutable buffer: Rebel.MutableQueue.t('a), 6 mutable sourceTalkback: (. talkbackT) => unit, 7 mutable notifierTalkback: (. talkbackT) => unit, 8 mutable pulled: bool, 9 mutable ended: bool, 10}; 11 12[@genType] 13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) => 14 curry(source => 15 curry(sink => { 16 let state = { 17 buffer: Rebel.MutableQueue.make(), 18 sourceTalkback: talkbackPlaceholder, 19 notifierTalkback: talkbackPlaceholder, 20 pulled: false, 21 ended: false, 22 }; 23 24 source((. signal) => 25 switch (signal) { 26 | Start(tb) => 27 state.sourceTalkback = tb; 28 29 notifier((. signal) => 30 switch (signal) { 31 | Start(tb) => state.notifierTalkback = tb 32 | Push(_) when !state.ended => 33 if (Rebel.MutableQueue.size(state.buffer) > 0) { 34 let buffer = state.buffer; 35 state.buffer = Rebel.MutableQueue.make(); 36 sink(. Push(Rebel.MutableQueue.toArray(buffer))); 37 } 38 | Push(_) => () 39 | End when !state.ended => 40 state.ended = true; 41 state.sourceTalkback(. Close); 42 if (Rebel.MutableQueue.size(state.buffer) > 0) { 43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 44 }; 45 sink(. End); 46 | End => () 47 } 48 ); 49 | Push(value) when !state.ended => 50 Rebel.MutableQueue.add(state.buffer, value); 51 if (!state.pulled) { 52 state.pulled = true; 53 state.sourceTalkback(. Pull); 54 state.notifierTalkback(. Pull); 55 } else { 56 state.pulled = false; 57 }; 58 | Push(_) => () 59 | End when !state.ended => 60 state.ended = true; 61 state.notifierTalkback(. Close); 62 if (Rebel.MutableQueue.size(state.buffer) > 0) { 63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer))); 64 }; 65 sink(. End); 66 | End => () 67 } 68 ); 69 70 sink(. 71 Start( 72 (. signal) => 73 if (!state.ended) { 74 switch (signal) { 75 | Close => 76 state.ended = true; 77 state.sourceTalkback(. Close); 78 state.notifierTalkback(. Close); 79 | Pull when !state.pulled => 80 state.pulled = true; 81 state.sourceTalkback(. Pull); 82 state.notifierTalkback(. Pull); 83 | Pull => () 84 }; 85 }, 86 ), 87 ); 88 }) 89 ); 90 91type combineStateT('a, 'b) = { 92 mutable talkbackA: (. talkbackT) => unit, 93 mutable talkbackB: (. talkbackT) => unit, 94 mutable lastValA: option('a), 95 mutable lastValB: option('b), 96 mutable gotSignal: bool, 97 mutable endCounter: int, 98 mutable ended: bool, 99}; 100 101[@genType] 102let combine = 103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) => 104 curry(sink => { 105 let state = { 106 talkbackA: talkbackPlaceholder, 107 talkbackB: talkbackPlaceholder, 108 lastValA: None, 109 lastValB: None, 110 gotSignal: false, 111 endCounter: 0, 112 ended: false, 113 }; 114 115 sourceA((. signal) => 116 switch (signal, state.lastValB) { 117 | (Start(tb), _) => state.talkbackA = tb 118 | (Push(a), None) => 119 state.lastValA = Some(a); 120 if (!state.gotSignal) { 121 state.talkbackB(. Pull); 122 } else { 123 state.gotSignal = false; 124 }; 125 | (Push(a), Some(b)) when !state.ended => 126 state.lastValA = Some(a); 127 state.gotSignal = false; 128 sink(. Push((a, b))); 129 | (End, _) when state.endCounter < 1 => 130 state.endCounter = state.endCounter + 1 131 | (End, _) when !state.ended => 132 state.ended = true; 133 sink(. End); 134 | _ => () 135 } 136 ); 137 138 sourceB((. signal) => 139 switch (signal, state.lastValA) { 140 | (Start(tb), _) => state.talkbackB = tb 141 | (Push(b), None) => 142 state.lastValB = Some(b); 143 if (!state.gotSignal) { 144 state.talkbackA(. Pull); 145 } else { 146 state.gotSignal = false; 147 }; 148 | (Push(b), Some(a)) when !state.ended => 149 state.lastValB = Some(b); 150 state.gotSignal = false; 151 sink(. Push((a, b))); 152 | (End, _) when state.endCounter < 1 => 153 state.endCounter = state.endCounter + 1 154 | (End, _) when !state.ended => 155 state.ended = true; 156 sink(. End); 157 | _ => () 158 } 159 ); 160 161 sink(. 162 Start( 163 (. signal) => 164 if (!state.ended) { 165 switch (signal) { 166 | Close => 167 state.ended = true; 168 state.talkbackA(. Close); 169 state.talkbackB(. Close); 170 | Pull when !state.gotSignal => 171 state.gotSignal = true; 172 state.talkbackA(. signal); 173 state.talkbackB(. signal); 174 | Pull => () 175 }; 176 }, 177 ), 178 ); 179 }); 180 181type concatMapStateT('a) = { 182 inputQueue: Rebel.MutableQueue.t('a), 183 mutable outerTalkback: (. talkbackT) => unit, 184 mutable outerPulled: bool, 185 mutable innerTalkback: (. talkbackT) => unit, 186 mutable innerActive: bool, 187 mutable innerPulled: bool, 188 mutable ended: bool, 189}; 190 191[@genType] 192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 193 curry(source => 194 curry(sink => { 195 let state: concatMapStateT('a) = { 196 inputQueue: Rebel.MutableQueue.make(), 197 outerTalkback: talkbackPlaceholder, 198 outerPulled: false, 199 innerTalkback: talkbackPlaceholder, 200 innerActive: false, 201 innerPulled: false, 202 ended: false, 203 }; 204 205 let rec applyInnerSource = innerSource => 206 innerSource((. signal) => 207 switch (signal) { 208 | Start(tb) => 209 state.innerActive = true; 210 state.innerTalkback = tb; 211 state.innerPulled = false; 212 tb(. Pull); 213 | Push(_) when state.innerActive => 214 sink(. signal); 215 if (!state.innerPulled) { 216 state.innerTalkback(. Pull); 217 } else { 218 state.innerPulled = false; 219 }; 220 | Push(_) => () 221 | End when state.innerActive => 222 state.innerActive = false; 223 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 224 | Some(input) => applyInnerSource(f(. input)) 225 | None when state.ended => sink(. End) 226 | None when !state.outerPulled => 227 state.outerPulled = true; 228 state.outerTalkback(. Pull); 229 | None => () 230 }; 231 | End => () 232 } 233 ); 234 235 source((. signal) => 236 switch (signal) { 237 | Start(tb) => state.outerTalkback = tb 238 | Push(x) when !state.ended => 239 state.outerPulled = false; 240 if (state.innerActive) { 241 Rebel.MutableQueue.add(state.inputQueue, x); 242 } else { 243 applyInnerSource(f(. x)); 244 }; 245 | Push(_) => () 246 | End when !state.ended => 247 state.ended = true; 248 if (!state.innerActive 249 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 250 sink(. End); 251 }; 252 | End => () 253 } 254 ); 255 256 sink(. 257 Start( 258 (. signal) => 259 switch (signal) { 260 | Pull => 261 if (!state.ended && !state.outerPulled) { 262 state.outerPulled = true; 263 state.outerTalkback(. Pull); 264 }; 265 if (state.innerActive && !state.innerPulled) { 266 state.innerPulled = true; 267 state.innerTalkback(. Pull); 268 }; 269 | Close => 270 if (!state.ended) { 271 state.ended = true; 272 state.outerTalkback(. Close); 273 }; 274 if (state.innerActive) { 275 state.innerActive = false; 276 state.innerTalkback(. Close); 277 }; 278 }, 279 ), 280 ); 281 }) 282 ); 283 284[@genType] 285let concatAll = (source: sourceT(sourceT('a))): sourceT('a) => 286 concatMap((. x) => x, source); 287 288[@genType] 289let concat = (sources: array(sourceT('a))): sourceT('a) => 290 concatMap((. x) => x, Wonka_sources.fromArray(sources)); 291 292[@genType] 293let filter = (f: (. 'a) => bool): operatorT('a, 'a) => 294 curry(source => 295 curry(sink => { 296 let talkback = ref(talkbackPlaceholder); 297 298 source((. signal) => 299 switch (signal) { 300 | Start(tb) => 301 talkback := tb; 302 sink(. signal); 303 | Push(x) when !f(. x) => talkback^(. Pull) 304 | _ => sink(. signal) 305 } 306 ); 307 }) 308 ); 309 310[@genType] 311let map = (f: (. 'a) => 'b): operatorT('a, 'b) => 312 curry(source => 313 curry(sink => 314 source((. signal) => 315 sink(. 316 /* The signal needs to be recreated for genType to generate 317 the correct generics during codegen */ 318 switch (signal) { 319 | Start(x) => Start(x) 320 | Push(x) => Push(f(. x)) 321 | End => End 322 }, 323 ) 324 ) 325 ) 326 ); 327 328type mergeMapStateT = { 329 mutable outerTalkback: (. talkbackT) => unit, 330 mutable outerPulled: bool, 331 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 332 mutable ended: bool, 333}; 334 335[@genType] 336let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 337 curry(source => 338 curry(sink => { 339 let state: mergeMapStateT = { 340 outerTalkback: talkbackPlaceholder, 341 outerPulled: false, 342 innerTalkbacks: Rebel.Array.makeEmpty(), 343 ended: false, 344 }; 345 346 let applyInnerSource = innerSource => { 347 let talkback = ref(talkbackPlaceholder); 348 349 innerSource((. signal) => 350 switch (signal) { 351 | Start(tb) => 352 talkback := tb; 353 state.innerTalkbacks = 354 Rebel.Array.append(state.innerTalkbacks, tb); 355 tb(. Pull); 356 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 357 sink(. Push(x)); 358 talkback^(. Pull); 359 | Push(_) => () 360 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 => 361 state.innerTalkbacks = 362 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 363 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0; 364 if (state.ended && exhausted) { 365 sink(. End); 366 } else if (!state.outerPulled && exhausted) { 367 state.outerPulled = true; 368 state.outerTalkback(. Pull); 369 }; 370 | End => () 371 } 372 ); 373 }; 374 375 source((. signal) => 376 switch (signal) { 377 | Start(tb) => state.outerTalkback = tb 378 | Push(x) when !state.ended => 379 state.outerPulled = false; 380 applyInnerSource(f(. x)); 381 if (!state.outerPulled) { 382 state.outerPulled = true; 383 state.outerTalkback(. Pull); 384 }; 385 | Push(_) => () 386 | End when !state.ended => 387 state.ended = true; 388 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 389 sink(. End); 390 }; 391 | End => () 392 } 393 ); 394 395 sink(. 396 Start( 397 (. signal) => 398 switch (signal) { 399 | Close => 400 if (!state.ended) { 401 state.ended = true; 402 state.outerTalkback(. signal); 403 }; 404 405 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal)); 406 state.innerTalkbacks = Rebel.Array.makeEmpty(); 407 | Pull => 408 if (!state.outerPulled && !state.ended) { 409 state.outerPulled = true; 410 state.outerTalkback(. Pull); 411 } else { 412 state.outerPulled = false; 413 }; 414 415 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull)); 416 }, 417 ), 418 ); 419 }) 420 ); 421 422[@genType] 423let merge = (sources: array(sourceT('a))): sourceT('a) => 424 mergeMap((. x) => x, Wonka_sources.fromArray(sources)); 425 426[@genType] 427let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) => 428 mergeMap((. x) => x, source); 429 430[@genType] 431let flatten = mergeAll; 432 433[@genType] 434let onEnd = (f: (. unit) => unit): operatorT('a, 'a) => 435 curry(source => 436 curry(sink => { 437 let ended = ref(false); 438 source((. signal) => 439 switch (signal) { 440 | Start(talkback) => 441 sink(. 442 Start( 443 (. signal) => 444 if (! ended^) { 445 switch (signal) { 446 | Pull => talkback(. signal) 447 | Close => 448 ended := true; 449 talkback(. signal); 450 f(.); 451 }; 452 }, 453 ), 454 ) 455 | Push(_) when ! ended^ => sink(. signal) 456 | Push(_) => () 457 | End when ! ended^ => 458 ended := true; 459 sink(. signal); 460 f(.); 461 | End => () 462 } 463 ); 464 }) 465 ); 466 467[@genType] 468let onPush = (f: (. 'a) => unit): operatorT('a, 'a) => 469 curry(source => 470 curry(sink => { 471 let ended = ref(false); 472 source((. signal) => { 473 switch (signal) { 474 | Start(talkback) => 475 sink(. 476 Start( 477 (. signal) => 478 if (! ended^) { 479 switch (signal) { 480 | Pull => talkback(. signal) 481 | Close => 482 ended := true; 483 talkback(. signal); 484 }; 485 }, 486 ), 487 ) 488 | Push(x) when ! ended^ => 489 f(. x); 490 sink(. signal); 491 | Push(_) => () 492 | End when ! ended^ => 493 ended := true; 494 sink(. signal); 495 | End => () 496 } 497 }); 498 }) 499 ); 500 501[@genType] 502let tap = onPush; 503 504[@genType] 505let onStart = (f: (. unit) => unit): operatorT('a, 'a) => 506 curry(source => 507 curry(sink => 508 source((. signal) => 509 switch (signal) { 510 | Start(_) => 511 sink(. signal); 512 f(.); 513 | _ => sink(. signal) 514 } 515 ) 516 ) 517 ); 518 519type sampleStateT('a) = { 520 mutable sourceTalkback: (. talkbackT) => unit, 521 mutable notifierTalkback: (. talkbackT) => unit, 522 mutable value: option('a), 523 mutable pulled: bool, 524 mutable ended: bool, 525}; 526 527[@genType] 528let sample = (notifier: sourceT('a)): operatorT('b, 'b) => 529 curry(source => 530 curry(sink => { 531 let state = { 532 sourceTalkback: talkbackPlaceholder, 533 notifierTalkback: talkbackPlaceholder, 534 value: None, 535 pulled: false, 536 ended: false, 537 }; 538 539 source((. signal) => 540 switch (signal) { 541 | Start(tb) => state.sourceTalkback = tb 542 | Push(x) => 543 state.value = Some(x); 544 if (!state.pulled) { 545 state.pulled = true; 546 state.notifierTalkback(. Pull); 547 state.sourceTalkback(. Pull); 548 } else { 549 state.pulled = false; 550 }; 551 | End when !state.ended => 552 state.ended = true; 553 state.notifierTalkback(. Close); 554 sink(. End); 555 | End => () 556 } 557 ); 558 559 notifier((. signal) => 560 switch (signal, state.value) { 561 | (Start(tb), _) => state.notifierTalkback = tb 562 | (End, _) when !state.ended => 563 state.ended = true; 564 state.sourceTalkback(. Close); 565 sink(. End); 566 | (End, _) => () 567 | (Push(_), Some(x)) when !state.ended => 568 state.value = None; 569 sink(. Push(x)); 570 | (Push(_), _) => () 571 } 572 ); 573 574 sink(. 575 Start( 576 (. signal) => 577 if (!state.ended) { 578 switch (signal) { 579 | Pull when !state.pulled => 580 state.pulled = true; 581 state.sourceTalkback(. Pull); 582 state.notifierTalkback(. Pull); 583 | Pull => () 584 | Close => 585 state.ended = true; 586 state.sourceTalkback(. Close); 587 state.notifierTalkback(. Close); 588 }; 589 }, 590 ), 591 ); 592 }) 593 ); 594 595[@genType] 596let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) => 597 curry(source => 598 curry(sink => { 599 let acc = ref(seed); 600 601 source((. signal) => 602 sink(. 603 switch (signal) { 604 | Push(x) => 605 acc := f(. acc^, x); 606 Push(acc^); 607 | Start(x) => Start(x) 608 | End => End 609 }, 610 ) 611 ); 612 }) 613 ); 614 615type shareStateT('a) = { 616 mutable sinks: Rebel.Array.t(sinkT('a)), 617 mutable talkback: (. talkbackT) => unit, 618 mutable gotSignal: bool, 619}; 620 621[@genType] 622let share = (source: sourceT('a)): sourceT('a) => { 623 let state = { 624 sinks: Rebel.Array.makeEmpty(), 625 talkback: talkbackPlaceholder, 626 gotSignal: false, 627 }; 628 629 sink => { 630 state.sinks = Rebel.Array.append(state.sinks, sink); 631 632 if (Rebel.Array.size(state.sinks) === 1) { 633 source((. signal) => 634 switch (signal) { 635 | Push(_) => 636 state.gotSignal = false; 637 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 638 | Start(x) => state.talkback = x 639 | End => 640 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 641 state.sinks = Rebel.Array.makeEmpty(); 642 } 643 ); 644 }; 645 646 sink(. 647 Start( 648 (. signal) => 649 switch (signal) { 650 | Close => 651 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 652 if (Rebel.Array.size(state.sinks) === 0) { 653 state.talkback(. Close); 654 }; 655 | Pull when !state.gotSignal => 656 state.gotSignal = true; 657 state.talkback(. signal); 658 | Pull => () 659 }, 660 ), 661 ); 662 }; 663}; 664 665type skipStateT = { 666 mutable talkback: (. talkbackT) => unit, 667 mutable rest: int, 668}; 669 670[@genType] 671let skip = (wait: int): operatorT('a, 'a) => 672 curry(source => 673 curry(sink => { 674 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait}; 675 676 source((. signal) => 677 switch (signal) { 678 | Start(tb) => 679 state.talkback = tb; 680 sink(. signal); 681 | Push(_) when state.rest > 0 => 682 state.rest = state.rest - 1; 683 state.talkback(. Pull); 684 | _ => sink(. signal) 685 } 686 ); 687 }) 688 ); 689 690type skipUntilStateT = { 691 mutable sourceTalkback: (. talkbackT) => unit, 692 mutable notifierTalkback: (. talkbackT) => unit, 693 mutable skip: bool, 694 mutable pulled: bool, 695 mutable ended: bool, 696}; 697 698[@genType] 699let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 700 curry(source => 701 curry(sink => { 702 let state: skipUntilStateT = { 703 sourceTalkback: talkbackPlaceholder, 704 notifierTalkback: talkbackPlaceholder, 705 skip: true, 706 pulled: false, 707 ended: false, 708 }; 709 710 source((. signal) => 711 switch (signal) { 712 | Start(tb) => 713 state.sourceTalkback = tb; 714 715 notifier((. signal) => 716 switch (signal) { 717 | Start(innerTb) => 718 state.notifierTalkback = innerTb; 719 innerTb(. Pull); 720 | Push(_) => 721 state.skip = false; 722 state.notifierTalkback(. Close); 723 | End when state.skip => 724 state.ended = true; 725 state.sourceTalkback(. Close); 726 | End => () 727 } 728 ); 729 | Push(_) when !state.skip && !state.ended => 730 state.pulled = false; 731 sink(. signal); 732 | Push(_) when !state.pulled => 733 state.pulled = true; 734 state.sourceTalkback(. Pull); 735 state.notifierTalkback(. Pull); 736 | Push(_) => state.pulled = false 737 | End => 738 if (state.skip) { 739 state.notifierTalkback(. Close); 740 }; 741 state.ended = true; 742 sink(. End); 743 } 744 ); 745 746 sink(. 747 Start( 748 (. signal) => 749 if (!state.ended) { 750 switch (signal) { 751 | Close => 752 state.ended = true; 753 state.sourceTalkback(. Close); 754 if (state.skip) { 755 state.notifierTalkback(. Close); 756 }; 757 | Pull when !state.pulled => 758 state.pulled = true; 759 if (state.skip) { 760 state.notifierTalkback(. Pull); 761 }; 762 state.sourceTalkback(. Pull); 763 | Pull => () 764 }; 765 }, 766 ), 767 ); 768 }) 769 ); 770 771type skipWhileStateT = { 772 mutable talkback: (. talkbackT) => unit, 773 mutable skip: bool, 774}; 775 776[@genType] 777let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 778 curry(source => 779 curry(sink => { 780 let state: skipWhileStateT = { 781 talkback: talkbackPlaceholder, 782 skip: true, 783 }; 784 785 source((. signal) => 786 switch (signal) { 787 | Start(tb) => 788 state.talkback = tb; 789 sink(. signal); 790 | Push(x) when state.skip => 791 if (f(. x)) { 792 state.talkback(. Pull); 793 } else { 794 state.skip = false; 795 sink(. signal); 796 } 797 | _ => sink(. signal) 798 } 799 ); 800 }) 801 ); 802 803type switchMapStateT('a) = { 804 mutable outerTalkback: (. talkbackT) => unit, 805 mutable outerPulled: bool, 806 mutable innerTalkback: (. talkbackT) => unit, 807 mutable innerActive: bool, 808 mutable innerPulled: bool, 809 mutable ended: bool, 810}; 811 812[@genType] 813let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) => 814 curry(source => 815 curry(sink => { 816 let state: switchMapStateT('a) = { 817 outerTalkback: talkbackPlaceholder, 818 outerPulled: false, 819 innerTalkback: talkbackPlaceholder, 820 innerActive: false, 821 innerPulled: false, 822 ended: false, 823 }; 824 825 let applyInnerSource = innerSource => 826 innerSource((. signal) => 827 switch (signal) { 828 | Start(tb) => 829 state.innerActive = true; 830 state.innerTalkback = tb; 831 state.innerPulled = false; 832 tb(. Pull); 833 | Push(_) when state.innerActive => 834 sink(. signal); 835 if (!state.innerPulled) { 836 state.innerTalkback(. Pull); 837 } else { 838 state.innerPulled = false; 839 }; 840 | Push(_) => () 841 | End when state.innerActive => 842 state.innerActive = false; 843 if (state.ended) { 844 sink(. signal); 845 } else if (!state.outerPulled) { 846 state.outerPulled = true; 847 state.outerTalkback(. Pull); 848 }; 849 | End => () 850 } 851 ); 852 853 source((. signal) => 854 switch (signal) { 855 | Start(tb) => state.outerTalkback = tb 856 | Push(x) when !state.ended => 857 if (state.innerActive) { 858 state.innerTalkback(. Close); 859 state.innerTalkback = talkbackPlaceholder; 860 }; 861 862 if (!state.outerPulled) { 863 state.outerPulled = true; 864 state.outerTalkback(. Pull); 865 } else { 866 state.outerPulled = false; 867 }; 868 869 applyInnerSource(f(. x)); 870 | Push(_) => () 871 | End when !state.ended => 872 state.ended = true; 873 if (!state.innerActive) { 874 sink(. End); 875 }; 876 | End => () 877 } 878 ); 879 880 sink(. 881 Start( 882 (. signal) => 883 switch (signal) { 884 | Pull => 885 if (!state.ended && !state.outerPulled) { 886 state.outerPulled = true; 887 state.outerTalkback(. Pull); 888 }; 889 if (state.innerActive && !state.innerPulled) { 890 state.innerPulled = true; 891 state.innerTalkback(. Pull); 892 }; 893 | Close => 894 if (!state.ended) { 895 state.ended = true; 896 state.outerTalkback(. Close); 897 }; 898 if (state.innerActive) { 899 state.innerActive = false; 900 state.innerTalkback(. Close); 901 }; 902 }, 903 ), 904 ); 905 }) 906 ); 907 908[@genType] 909let switchAll = (source: sourceT(sourceT('a))): sourceT('a) => 910 switchMap((. x) => x, source); 911 912type takeStateT = { 913 mutable ended: bool, 914 mutable taken: int, 915 mutable talkback: (. talkbackT) => unit, 916}; 917 918[@genType] 919let take = (max: int): operatorT('a, 'a) => 920 curry(source => 921 curry(sink => { 922 let state: takeStateT = { 923 ended: false, 924 taken: 0, 925 talkback: talkbackPlaceholder, 926 }; 927 928 source((. signal) => 929 switch (signal) { 930 | Start(tb) when max <= 0 => 931 state.ended = true; 932 sink(. End); 933 tb(. Close); 934 | Start(tb) => state.talkback = tb 935 | Push(_) when state.taken < max && !state.ended => 936 state.taken = state.taken + 1; 937 sink(. signal); 938 if (!state.ended && state.taken >= max) { 939 state.ended = true; 940 sink(. End); 941 state.talkback(. Close); 942 }; 943 | Push(_) => () 944 | End when !state.ended => 945 state.ended = true; 946 sink(. End); 947 | End => () 948 } 949 ); 950 951 sink(. 952 Start( 953 (. signal) => 954 if (!state.ended) { 955 switch (signal) { 956 | Pull when state.taken < max => state.talkback(. Pull) 957 | Pull => () 958 | Close => 959 state.ended = true; 960 state.talkback(. Close); 961 }; 962 }, 963 ), 964 ); 965 }) 966 ); 967 968[@genType] 969let takeLast = (max: int): operatorT('a, 'a) => 970 curry(source => 971 curry(sink => { 972 open Rebel; 973 let talkback = ref(talkbackPlaceholder); 974 let queue = MutableQueue.make(); 975 976 source((. signal) => 977 switch (signal) { 978 | Start(tb) when max <= 0 => 979 tb(. Close); 980 Wonka_sources.empty(sink); 981 | Start(tb) => 982 talkback := tb; 983 tb(. Pull); 984 | Push(x) => 985 let size = MutableQueue.size(queue); 986 if (size >= max && max > 0) { 987 ignore(MutableQueue.pop(queue)); 988 }; 989 990 MutableQueue.add(queue, x); 991 talkback^(. Pull); 992 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue)) 993 } 994 ); 995 }) 996 ); 997 998type takeUntilStateT = { 999 mutable ended: bool, 1000 mutable sourceTalkback: (. talkbackT) => unit, 1001 mutable notifierTalkback: (. talkbackT) => unit, 1002}; 1003 1004[@genType] 1005let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) => 1006 curry(source => 1007 curry(sink => { 1008 let state: takeUntilStateT = { 1009 ended: false, 1010 sourceTalkback: talkbackPlaceholder, 1011 notifierTalkback: talkbackPlaceholder, 1012 }; 1013 1014 source((. signal) => 1015 switch (signal) { 1016 | Start(tb) => 1017 state.sourceTalkback = tb; 1018 1019 notifier((. signal) => 1020 switch (signal) { 1021 | Start(innerTb) => 1022 state.notifierTalkback = innerTb; 1023 innerTb(. Pull); 1024 | Push(_) => 1025 state.ended = true; 1026 state.sourceTalkback(. Close); 1027 sink(. End); 1028 | End => () 1029 } 1030 ); 1031 | End when !state.ended => 1032 state.ended = true; 1033 state.notifierTalkback(. Close); 1034 sink(. End); 1035 | End => () 1036 | Push(_) when !state.ended => sink(. signal) 1037 | Push(_) => () 1038 } 1039 ); 1040 1041 sink(. 1042 Start( 1043 (. signal) => 1044 if (!state.ended) { 1045 switch (signal) { 1046 | Close => 1047 state.ended = true; 1048 state.sourceTalkback(. Close); 1049 state.notifierTalkback(. Close); 1050 | Pull => state.sourceTalkback(. Pull) 1051 }; 1052 }, 1053 ), 1054 ); 1055 }) 1056 ); 1057 1058type takeWhileStateT = { 1059 mutable talkback: (. talkbackT) => unit, 1060 mutable ended: bool, 1061}; 1062 1063[@genType] 1064let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) => 1065 curry(source => 1066 curry(sink => { 1067 let state: takeWhileStateT = { 1068 talkback: talkbackPlaceholder, 1069 ended: false, 1070 }; 1071 1072 source((. signal) => 1073 switch (signal) { 1074 | Start(tb) => 1075 state.talkback = tb; 1076 sink(. signal); 1077 | End when !state.ended => 1078 state.ended = true; 1079 sink(. End); 1080 | End => () 1081 | Push(x) when !state.ended => 1082 if (!f(. x)) { 1083 state.ended = true; 1084 sink(. End); 1085 state.talkback(. Close); 1086 } else { 1087 sink(. signal); 1088 } 1089 | Push(_) => () 1090 } 1091 ); 1092 }) 1093 );