1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable pulled: bool,
9 mutable ended: bool,
10};
11
12[@genType]
13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
14 curry(source =>
15 curry(sink => {
16 let state = {
17 buffer: Rebel.MutableQueue.make(),
18 sourceTalkback: talkbackPlaceholder,
19 notifierTalkback: talkbackPlaceholder,
20 pulled: false,
21 ended: false,
22 };
23
24 source((. signal) => {
25 switch (signal) {
26 | Start(tb) =>
27 state.sourceTalkback = tb;
28
29 notifier((. signal) => {
30 switch (signal) {
31 | Start(tb) => state.notifierTalkback = tb
32 | Push(_) when !state.ended =>
33 if (Rebel.MutableQueue.size(state.buffer) > 0) {
34 let buffer = state.buffer;
35 state.buffer = Rebel.MutableQueue.make();
36 sink(. Push(Rebel.MutableQueue.toArray(buffer)));
37 }
38 | Push(_) => ()
39 | End when !state.ended =>
40 state.ended = true;
41 state.sourceTalkback(. Close);
42 if (Rebel.MutableQueue.size(state.buffer) > 0) {
43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
44 };
45 sink(. End);
46 | End => ()
47 };
48 ();
49 });
50 | Push(value) when !state.ended =>
51 Rebel.MutableQueue.add(state.buffer, value);
52 if (!state.pulled) {
53 state.pulled = true;
54 state.sourceTalkback(. Pull);
55 state.notifierTalkback(. Pull);
56 } else {
57 state.pulled = false;
58 };
59 | Push(_) => ()
60 | End when !state.ended =>
61 state.ended = true;
62 state.notifierTalkback(. Close);
63 if (Rebel.MutableQueue.size(state.buffer) > 0) {
64 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
65 };
66 sink(. End);
67 | End => ()
68 };
69 ();
70 });
71
72 sink(.
73 Start(
74 (. signal) =>
75 if (!state.ended) {
76 switch (signal) {
77 | Close =>
78 state.ended = true;
79 state.sourceTalkback(. Close);
80 state.notifierTalkback(. Close);
81 | Pull when !state.pulled =>
82 state.pulled = true;
83 state.sourceTalkback(. Pull);
84 state.notifierTalkback(. Pull);
85 | Pull => ()
86 };
87 },
88 ),
89 );
90 })
91 );
92
93type combineStateT('a, 'b) = {
94 mutable talkbackA: (. talkbackT) => unit,
95 mutable talkbackB: (. talkbackT) => unit,
96 mutable lastValA: option('a),
97 mutable lastValB: option('b),
98 mutable gotSignal: bool,
99 mutable endCounter: int,
100 mutable ended: bool,
101};
102
103[@genType]
104let combine =
105 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
106 curry(sink => {
107 let state = {
108 talkbackA: talkbackPlaceholder,
109 talkbackB: talkbackPlaceholder,
110 lastValA: None,
111 lastValB: None,
112 gotSignal: false,
113 endCounter: 0,
114 ended: false,
115 };
116
117 sourceA((. signal) =>
118 switch (signal, state.lastValB) {
119 | (Start(tb), _) => state.talkbackA = tb
120 | (Push(a), None) =>
121 state.lastValA = Some(a);
122 if (!state.gotSignal) {
123 state.talkbackB(. Pull);
124 } else {
125 state.gotSignal = false;
126 };
127 | (Push(a), Some(b)) when !state.ended =>
128 state.lastValA = Some(a);
129 state.gotSignal = false;
130 sink(. Push((a, b)));
131 | (End, _) when state.endCounter < 1 =>
132 state.endCounter = state.endCounter + 1
133 | (End, _) when !state.ended =>
134 state.ended = true;
135 sink(. End);
136 | _ => ()
137 }
138 );
139
140 sourceB((. signal) =>
141 switch (signal, state.lastValA) {
142 | (Start(tb), _) => state.talkbackB = tb
143 | (Push(b), None) =>
144 state.lastValB = Some(b);
145 if (!state.gotSignal) {
146 state.talkbackA(. Pull);
147 } else {
148 state.gotSignal = false;
149 };
150 | (Push(b), Some(a)) when !state.ended =>
151 state.lastValB = Some(b);
152 state.gotSignal = false;
153 sink(. Push((a, b)));
154 | (End, _) when state.endCounter < 1 =>
155 state.endCounter = state.endCounter + 1
156 | (End, _) when !state.ended =>
157 state.ended = true;
158 sink(. End);
159 | _ => ()
160 }
161 );
162
163 sink(.
164 Start(
165 (. signal) =>
166 if (!state.ended) {
167 switch (signal) {
168 | Close =>
169 state.ended = true;
170 state.talkbackA(. Close);
171 state.talkbackB(. Close);
172 | Pull when !state.gotSignal =>
173 state.gotSignal = true;
174 state.talkbackA(. signal);
175 state.talkbackB(. signal);
176 | Pull => ()
177 };
178 },
179 ),
180 );
181 });
182
183type concatMapStateT('a) = {
184 inputQueue: Rebel.MutableQueue.t('a),
185 mutable outerTalkback: (. talkbackT) => unit,
186 mutable outerPulled: bool,
187 mutable innerTalkback: (. talkbackT) => unit,
188 mutable innerActive: bool,
189 mutable innerPulled: bool,
190 mutable ended: bool,
191};
192
193[@genType]
194let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
195 curry(source =>
196 curry(sink => {
197 let state: concatMapStateT('a) = {
198 inputQueue: Rebel.MutableQueue.make(),
199 outerTalkback: talkbackPlaceholder,
200 outerPulled: false,
201 innerTalkback: talkbackPlaceholder,
202 innerActive: false,
203 innerPulled: false,
204 ended: false,
205 };
206
207 let rec applyInnerSource = innerSource => {
208 state.innerActive = true;
209 innerSource((. signal) => {
210 switch (signal) {
211 | Start(tb) =>
212 state.innerTalkback = tb;
213 state.innerPulled = false;
214 tb(. Pull);
215 | Push(_) when state.innerActive =>
216 sink(. signal);
217 if (!state.innerPulled) {
218 state.innerTalkback(. Pull);
219 } else {
220 state.innerPulled = false;
221 };
222 | Push(_) => ()
223 | End when state.innerActive =>
224 state.innerActive = false;
225 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
226 | Some(input) => applyInnerSource(f(. input))
227 | None when state.ended => sink(. End)
228 | None when !state.outerPulled =>
229 state.outerPulled = true;
230 state.outerTalkback(. Pull);
231 | None => ()
232 };
233 | End => ()
234 };
235 ();
236 });
237 ();
238 };
239
240 source((. signal) => {
241 switch (signal) {
242 | Start(tb) => state.outerTalkback = tb
243 | Push(x) when !state.ended =>
244 state.outerPulled = false;
245 if (state.innerActive) {
246 Rebel.MutableQueue.add(state.inputQueue, x);
247 } else {
248 applyInnerSource(f(. x));
249 };
250 | Push(_) => ()
251 | End when !state.ended =>
252 state.ended = true;
253 if (!state.innerActive
254 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
255 sink(. End);
256 };
257 | End => ()
258 };
259 ();
260 });
261
262 sink(.
263 Start(
264 (. signal) =>
265 switch (signal) {
266 | Pull =>
267 if (!state.ended && !state.outerPulled) {
268 state.outerPulled = true;
269 state.outerTalkback(. Pull);
270 };
271 if (state.innerActive && !state.innerPulled) {
272 state.innerPulled = true;
273 state.innerTalkback(. Pull);
274 };
275 | Close =>
276 if (!state.ended) {
277 state.ended = true;
278 state.outerTalkback(. Close);
279 };
280 if (state.innerActive) {
281 state.innerActive = false;
282 state.innerTalkback(. Close);
283 };
284 },
285 ),
286 );
287 })
288 );
289
290[@genType]
291let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
292 concatMap((. x) => x, source);
293
294[@genType]
295let concat = (sources: array(sourceT('a))): sourceT('a) =>
296 concatMap((. x) => x, Wonka_sources.fromArray(sources));
297
298[@genType]
299let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
300 curry(source =>
301 curry(sink => {
302 let talkback = ref(talkbackPlaceholder);
303
304 source((. signal) => {
305 switch (signal) {
306 | Start(tb) =>
307 talkback := tb;
308 sink(. signal);
309 | Push(x) when !f(. x) => talkback^(. Pull)
310 | _ => sink(. signal)
311 };
312 ();
313 });
314 })
315 );
316
317[@genType]
318let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
319 curry(source =>
320 curry(sink =>
321 source((. signal) => {
322 sink(.
323 /* The signal needs to be recreated for genType to generate
324 the correct generics during codegen */
325 switch (signal) {
326 | Start(x) => Start(x)
327 | Push(x) => Push(f(. x))
328 | End => End
329 },
330 )
331 })
332 )
333 );
334
335type mergeMapStateT = {
336 mutable outerTalkback: (. talkbackT) => unit,
337 mutable outerPulled: bool,
338 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
339 mutable ended: bool,
340};
341
342[@genType]
343let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
344 curry(source =>
345 curry(sink => {
346 let state: mergeMapStateT = {
347 outerTalkback: talkbackPlaceholder,
348 outerPulled: false,
349 innerTalkbacks: Rebel.Array.makeEmpty(),
350 ended: false,
351 };
352
353 let applyInnerSource = innerSource => {
354 let talkback = ref(talkbackPlaceholder);
355
356 innerSource((. signal) =>
357 switch (signal) {
358 | Start(tb) =>
359 talkback := tb;
360 state.innerTalkbacks =
361 Rebel.Array.append(state.innerTalkbacks, tb);
362 tb(. Pull);
363 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
364 sink(. Push(x));
365 talkback^(. Pull);
366 | Push(_) => ()
367 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
368 state.innerTalkbacks =
369 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
370 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0;
371 if (state.ended && exhausted) {
372 sink(. End);
373 } else if (!state.outerPulled && exhausted) {
374 state.outerPulled = true;
375 state.outerTalkback(. Pull);
376 };
377 | End => ()
378 }
379 );
380 };
381
382 source((. signal) =>
383 switch (signal) {
384 | Start(tb) => state.outerTalkback = tb
385 | Push(x) when !state.ended =>
386 state.outerPulled = false;
387 applyInnerSource(f(. x));
388 if (!state.outerPulled) {
389 state.outerPulled = true;
390 state.outerTalkback(. Pull);
391 };
392 | Push(_) => ()
393 | End when !state.ended =>
394 state.ended = true;
395 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
396 sink(. End);
397 };
398 | End => ()
399 }
400 );
401
402 sink(.
403 Start(
404 (. signal) =>
405 switch (signal) {
406 | Close =>
407 if (!state.ended) {
408 state.ended = true;
409 state.outerTalkback(. signal);
410 };
411
412 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal));
413 state.innerTalkbacks = Rebel.Array.makeEmpty();
414 | Pull =>
415 if (!state.outerPulled && !state.ended) {
416 state.outerPulled = true;
417 state.outerTalkback(. Pull);
418 } else {
419 state.outerPulled = false;
420 };
421
422 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
423 },
424 ),
425 );
426 })
427 );
428
429[@genType]
430let merge = (sources: array(sourceT('a))): sourceT('a) =>
431 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
432
433[@genType]
434let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
435 mergeMap((. x) => x, source);
436
437[@genType]
438let flatten = mergeAll;
439
440[@genType]
441let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
442 curry(source =>
443 curry(sink => {
444 let ended = ref(false);
445 source((. signal) =>
446 switch (signal) {
447 | Start(talkback) =>
448 sink(.
449 Start(
450 (. signal) =>
451 if (! ended^) {
452 switch (signal) {
453 | Pull => talkback(. signal)
454 | Close =>
455 ended := true;
456 talkback(. signal);
457 f(.);
458 };
459 },
460 ),
461 )
462 | Push(_) when ! ended^ => sink(. signal)
463 | Push(_) => ()
464 | End when ! ended^ =>
465 ended := true;
466 sink(. signal);
467 f(.);
468 | End => ()
469 }
470 );
471 })
472 );
473
474[@genType]
475let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
476 curry(source =>
477 curry(sink => {
478 let ended = ref(false);
479 source((. signal) => {
480 switch (signal) {
481 | Start(talkback) =>
482 sink(.
483 Start(
484 (. signal) =>
485 if (! ended^) {
486 switch (signal) {
487 | Pull => talkback(. signal)
488 | Close =>
489 ended := true;
490 talkback(. signal);
491 };
492 },
493 ),
494 )
495 | Push(x) when ! ended^ =>
496 f(. x);
497 sink(. signal);
498 | Push(_) => ()
499 | End when ! ended^ =>
500 ended := true;
501 sink(. signal);
502 | End => ()
503 };
504 ();
505 });
506 })
507 );
508
509[@genType]
510let tap = onPush;
511
512[@genType]
513let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
514 curry(source =>
515 curry(sink =>
516 source((. signal) =>
517 switch (signal) {
518 | Start(_) =>
519 sink(. signal);
520 f(.);
521 | _ => sink(. signal)
522 }
523 )
524 )
525 );
526
527type sampleStateT('a) = {
528 mutable sourceTalkback: (. talkbackT) => unit,
529 mutable notifierTalkback: (. talkbackT) => unit,
530 mutable value: option('a),
531 mutable pulled: bool,
532 mutable ended: bool,
533};
534
535[@genType]
536let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
537 curry(source =>
538 curry(sink => {
539 let state = {
540 sourceTalkback: talkbackPlaceholder,
541 notifierTalkback: talkbackPlaceholder,
542 value: None,
543 pulled: false,
544 ended: false,
545 };
546
547 source((. signal) =>
548 switch (signal) {
549 | Start(tb) => state.sourceTalkback = tb
550 | Push(x) =>
551 state.value = Some(x);
552 if (!state.pulled) {
553 state.pulled = true;
554 state.notifierTalkback(. Pull);
555 state.sourceTalkback(. Pull);
556 } else {
557 state.pulled = false;
558 };
559 | End when !state.ended =>
560 state.ended = true;
561 state.notifierTalkback(. Close);
562 sink(. End);
563 | End => ()
564 }
565 );
566
567 notifier((. signal) =>
568 switch (signal, state.value) {
569 | (Start(tb), _) => state.notifierTalkback = tb
570 | (End, _) when !state.ended =>
571 state.ended = true;
572 state.sourceTalkback(. Close);
573 sink(. End);
574 | (End, _) => ()
575 | (Push(_), Some(x)) when !state.ended =>
576 state.value = None;
577 sink(. Push(x));
578 | (Push(_), _) => ()
579 }
580 );
581
582 sink(.
583 Start(
584 (. signal) =>
585 if (!state.ended) {
586 switch (signal) {
587 | Pull when !state.pulled =>
588 state.pulled = true;
589 state.sourceTalkback(. Pull);
590 state.notifierTalkback(. Pull);
591 | Pull => ()
592 | Close =>
593 state.ended = true;
594 state.sourceTalkback(. Close);
595 state.notifierTalkback(. Close);
596 };
597 },
598 ),
599 );
600 })
601 );
602
603[@genType]
604let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
605 curry(source =>
606 curry(sink => {
607 let acc = ref(seed);
608
609 source((. signal) =>
610 sink(.
611 switch (signal) {
612 | Push(x) =>
613 acc := f(. acc^, x);
614 Push(acc^);
615 | Start(x) => Start(x)
616 | End => End
617 },
618 )
619 );
620 })
621 );
622
623type shareStateT('a) = {
624 mutable sinks: Rebel.Array.t(sinkT('a)),
625 mutable talkback: (. talkbackT) => unit,
626 mutable gotSignal: bool,
627};
628
629[@genType]
630let share = (source: sourceT('a)): sourceT('a) => {
631 let state = {
632 sinks: Rebel.Array.makeEmpty(),
633 talkback: talkbackPlaceholder,
634 gotSignal: false,
635 };
636
637 sink => {
638 state.sinks = Rebel.Array.append(state.sinks, sink);
639
640 if (Rebel.Array.size(state.sinks) === 1) {
641 source((. signal) =>
642 switch (signal) {
643 | Push(_) =>
644 state.gotSignal = false;
645 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
646 | Start(x) => state.talkback = x
647 | End =>
648 Rebel.Array.forEach(state.sinks, sink => sink(. End));
649 state.sinks = Rebel.Array.makeEmpty();
650 }
651 );
652 };
653
654 sink(.
655 Start(
656 (. signal) =>
657 switch (signal) {
658 | Close =>
659 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
660 if (Rebel.Array.size(state.sinks) === 0) {
661 state.talkback(. Close);
662 };
663 | Pull when !state.gotSignal =>
664 state.gotSignal = true;
665 state.talkback(. signal);
666 | Pull => ()
667 },
668 ),
669 );
670 };
671};
672
673type skipStateT = {
674 mutable talkback: (. talkbackT) => unit,
675 mutable rest: int,
676};
677
678[@genType]
679let skip = (wait: int): operatorT('a, 'a) =>
680 curry(source =>
681 curry(sink => {
682 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait};
683
684 source((. signal) =>
685 switch (signal) {
686 | Start(tb) =>
687 state.talkback = tb;
688 sink(. signal);
689 | Push(_) when state.rest > 0 =>
690 state.rest = state.rest - 1;
691 state.talkback(. Pull);
692 | _ => sink(. signal)
693 }
694 );
695 })
696 );
697
698type skipUntilStateT = {
699 mutable sourceTalkback: (. talkbackT) => unit,
700 mutable notifierTalkback: (. talkbackT) => unit,
701 mutable skip: bool,
702 mutable pulled: bool,
703 mutable ended: bool,
704};
705
706[@genType]
707let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
708 curry(source =>
709 curry(sink => {
710 let state: skipUntilStateT = {
711 sourceTalkback: talkbackPlaceholder,
712 notifierTalkback: talkbackPlaceholder,
713 skip: true,
714 pulled: false,
715 ended: false,
716 };
717
718 source((. signal) => {
719 switch (signal) {
720 | Start(tb) =>
721 state.sourceTalkback = tb;
722
723 notifier((. signal) => {
724 switch (signal) {
725 | Start(innerTb) =>
726 state.notifierTalkback = innerTb;
727 innerTb(. Pull);
728 | Push(_) =>
729 state.skip = false;
730 state.notifierTalkback(. Close);
731 | End when state.skip =>
732 state.ended = true;
733 state.sourceTalkback(. Close);
734 | End => ()
735 };
736 ();
737 });
738 | Push(_) when !state.skip && !state.ended =>
739 state.pulled = false;
740 sink(. signal);
741 | Push(_) when !state.pulled =>
742 state.pulled = true;
743 state.sourceTalkback(. Pull);
744 state.notifierTalkback(. Pull);
745 | Push(_) => state.pulled = false
746 | End =>
747 if (state.skip) {
748 state.notifierTalkback(. Close);
749 };
750 state.ended = true;
751 sink(. End);
752 };
753 ();
754 });
755
756 sink(.
757 Start(
758 (. signal) =>
759 if (!state.ended) {
760 switch (signal) {
761 | Close =>
762 state.ended = true;
763 state.sourceTalkback(. Close);
764 if (state.skip) {
765 state.notifierTalkback(. Close);
766 };
767 | Pull when !state.pulled =>
768 state.pulled = true;
769 if (state.skip) {
770 state.notifierTalkback(. Pull);
771 };
772 state.sourceTalkback(. Pull);
773 | Pull => ()
774 };
775 },
776 ),
777 );
778 })
779 );
780
781type skipWhileStateT = {
782 mutable talkback: (. talkbackT) => unit,
783 mutable skip: bool,
784};
785
786[@genType]
787let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
788 curry(source =>
789 curry(sink => {
790 let state: skipWhileStateT = {
791 talkback: talkbackPlaceholder,
792 skip: true,
793 };
794
795 source((. signal) =>
796 switch (signal) {
797 | Start(tb) =>
798 state.talkback = tb;
799 sink(. signal);
800 | Push(x) when state.skip =>
801 if (f(. x)) {
802 state.talkback(. Pull);
803 } else {
804 state.skip = false;
805 sink(. signal);
806 }
807 | _ => sink(. signal)
808 }
809 );
810 })
811 );
812
813type switchMapStateT('a) = {
814 mutable outerTalkback: (. talkbackT) => unit,
815 mutable outerPulled: bool,
816 mutable innerTalkback: (. talkbackT) => unit,
817 mutable innerActive: bool,
818 mutable innerPulled: bool,
819 mutable ended: bool,
820};
821
822[@genType]
823let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
824 curry(source =>
825 curry(sink => {
826 let state: switchMapStateT('a) = {
827 outerTalkback: talkbackPlaceholder,
828 outerPulled: false,
829 innerTalkback: talkbackPlaceholder,
830 innerActive: false,
831 innerPulled: false,
832 ended: false,
833 };
834
835 let applyInnerSource = innerSource => {
836 state.innerActive = true;
837 innerSource((. signal) =>
838 if (state.innerActive) {
839 switch (signal) {
840 | Start(tb) =>
841 state.innerTalkback = tb;
842 state.innerPulled = false;
843 tb(. Pull);
844 | Push(_) =>
845 sink(. signal);
846 if (!state.innerPulled) {
847 state.innerTalkback(. Pull);
848 } else {
849 state.innerPulled = false;
850 };
851 | End =>
852 state.innerActive = false;
853 if (state.ended) {
854 sink(. signal);
855 } else if (!state.outerPulled) {
856 state.outerPulled = true;
857 state.outerTalkback(. Pull);
858 };
859 };
860 }
861 );
862 ();
863 };
864
865 source((. signal) => {
866 switch (signal) {
867 | Start(tb) => state.outerTalkback = tb
868 | Push(x) when !state.ended =>
869 if (state.innerActive) {
870 state.innerTalkback(. Close);
871 state.innerTalkback = talkbackPlaceholder;
872 };
873
874 if (!state.outerPulled) {
875 state.outerPulled = true;
876 state.outerTalkback(. Pull);
877 } else {
878 state.outerPulled = false;
879 };
880
881 applyInnerSource(f(. x));
882 | Push(_) => ()
883 | End when !state.ended =>
884 state.ended = true;
885 if (!state.innerActive) {
886 sink(. End);
887 };
888 | End => ()
889 };
890 ();
891 });
892
893 sink(.
894 Start(
895 (. signal) =>
896 switch (signal) {
897 | Pull =>
898 if (!state.ended && !state.outerPulled) {
899 state.outerPulled = true;
900 state.outerTalkback(. Pull);
901 };
902 if (state.innerActive && !state.innerPulled) {
903 state.innerPulled = true;
904 state.innerTalkback(. Pull);
905 };
906 | Close =>
907 if (!state.ended) {
908 state.ended = true;
909 state.outerTalkback(. Close);
910 };
911 if (state.innerActive) {
912 state.innerActive = false;
913 state.innerTalkback(. Close);
914 };
915 },
916 ),
917 );
918 })
919 );
920
921[@genType]
922let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
923 switchMap((. x) => x, source);
924
925type takeStateT = {
926 mutable ended: bool,
927 mutable taken: int,
928 mutable talkback: (. talkbackT) => unit,
929};
930
931[@genType]
932let take = (max: int): operatorT('a, 'a) =>
933 curry(source =>
934 curry(sink => {
935 let state: takeStateT = {
936 ended: false,
937 taken: 0,
938 talkback: talkbackPlaceholder,
939 };
940
941 source((. signal) =>
942 switch (signal) {
943 | Start(tb) when max <= 0 =>
944 state.ended = true;
945 sink(. End);
946 tb(. Close);
947 | Start(tb) => state.talkback = tb
948 | Push(_) when state.taken < max && !state.ended =>
949 state.taken = state.taken + 1;
950 sink(. signal);
951 if (!state.ended && state.taken >= max) {
952 state.ended = true;
953 sink(. End);
954 state.talkback(. Close);
955 };
956 | Push(_) => ()
957 | End when !state.ended =>
958 state.ended = true;
959 sink(. End);
960 | End => ()
961 }
962 );
963
964 sink(.
965 Start(
966 (. signal) =>
967 if (!state.ended) {
968 switch (signal) {
969 | Pull when state.taken < max => state.talkback(. Pull)
970 | Pull => ()
971 | Close =>
972 state.ended = true;
973 state.talkback(. Close);
974 };
975 },
976 ),
977 );
978 })
979 );
980
981type takeLastStateT('a) = {
982 mutable queue: Rebel.MutableQueue.t('a),
983 mutable talkback: (. talkbackT) => unit,
984};
985
986[@genType]
987let takeLast = (max: int): operatorT('a, 'a) =>
988 curry(source =>
989 curry(sink => {
990 let state: takeLastStateT('a) = {
991 queue: Rebel.MutableQueue.make(),
992 talkback: talkbackPlaceholder,
993 };
994
995 source((. signal) => {
996 switch (signal) {
997 | Start(talkback) when max <= 0 =>
998 talkback(. Close);
999 Wonka_sources.empty(sink);
1000 | Start(talkback) =>
1001 state.talkback = talkback;
1002 talkback(. Pull);
1003 | Push(x) =>
1004 let size = Rebel.MutableQueue.size(state.queue);
1005 if (size >= max && max > 0) {
1006 ignore(Rebel.MutableQueue.pop(state.queue));
1007 };
1008
1009 Rebel.MutableQueue.add(state.queue, x);
1010 state.talkback(. Pull);
1011 | End =>
1012 Wonka_sources.fromArray(
1013 Rebel.MutableQueue.toArray(state.queue),
1014 sink,
1015 )
1016 };
1017 ();
1018 });
1019 })
1020 );
1021
1022type takeUntilStateT = {
1023 mutable ended: bool,
1024 mutable sourceTalkback: (. talkbackT) => unit,
1025 mutable notifierTalkback: (. talkbackT) => unit,
1026};
1027
1028[@genType]
1029let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
1030 curry(source =>
1031 curry(sink => {
1032 let state: takeUntilStateT = {
1033 ended: false,
1034 sourceTalkback: talkbackPlaceholder,
1035 notifierTalkback: talkbackPlaceholder,
1036 };
1037
1038 source((. signal) => {
1039 switch (signal) {
1040 | Start(tb) =>
1041 state.sourceTalkback = tb;
1042
1043 notifier((. signal) => {
1044 switch (signal) {
1045 | Start(innerTb) =>
1046 state.notifierTalkback = innerTb;
1047 innerTb(. Pull);
1048 | Push(_) =>
1049 state.ended = true;
1050 state.sourceTalkback(. Close);
1051 sink(. End);
1052 | End => ()
1053 };
1054 ();
1055 });
1056 | End when !state.ended =>
1057 state.ended = true;
1058 state.notifierTalkback(. Close);
1059 sink(. End);
1060 | End => ()
1061 | Push(_) when !state.ended => sink(. signal)
1062 | Push(_) => ()
1063 };
1064 ();
1065 });
1066
1067 sink(.
1068 Start(
1069 (. signal) =>
1070 if (!state.ended) {
1071 switch (signal) {
1072 | Close =>
1073 state.ended = true;
1074 state.sourceTalkback(. Close);
1075 state.notifierTalkback(. Close);
1076 | Pull => state.sourceTalkback(. Pull)
1077 };
1078 },
1079 ),
1080 );
1081 })
1082 );
1083
1084type takeWhileStateT = {
1085 mutable talkback: (. talkbackT) => unit,
1086 mutable ended: bool,
1087};
1088
1089[@genType]
1090let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
1091 curry(source =>
1092 curry(sink => {
1093 let state: takeWhileStateT = {
1094 talkback: talkbackPlaceholder,
1095 ended: false,
1096 };
1097
1098 source((. signal) =>
1099 switch (signal) {
1100 | Start(tb) =>
1101 state.talkback = tb;
1102 sink(. signal);
1103 | End when !state.ended =>
1104 state.ended = true;
1105 sink(. End);
1106 | End => ()
1107 | Push(x) when !state.ended =>
1108 if (!f(. x)) {
1109 state.ended = true;
1110 sink(. End);
1111 state.talkback(. Close);
1112 } else {
1113 sink(. signal);
1114 }
1115 | Push(_) => ()
1116 }
1117 );
1118 })
1119 );