1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable pulled: bool,
9 mutable ended: bool,
10};
11
12[@genType]
13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
14 curry(source =>
15 curry(sink => {
16 let state = {
17 buffer: Rebel.MutableQueue.make(),
18 sourceTalkback: talkbackPlaceholder,
19 notifierTalkback: talkbackPlaceholder,
20 pulled: false,
21 ended: false,
22 };
23
24 source((. signal) =>
25 switch (signal) {
26 | Start(tb) =>
27 state.sourceTalkback = tb;
28
29 notifier((. signal) =>
30 switch (signal) {
31 | Start(tb) => state.notifierTalkback = tb
32 | Push(_) when !state.ended =>
33 if (Rebel.MutableQueue.size(state.buffer) > 0) {
34 let buffer = state.buffer;
35 state.buffer = Rebel.MutableQueue.make();
36 sink(. Push(Rebel.MutableQueue.toArray(buffer)));
37 }
38 | Push(_) => ()
39 | End when !state.ended =>
40 state.ended = true;
41 state.sourceTalkback(. Close);
42 if (Rebel.MutableQueue.size(state.buffer) > 0) {
43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
44 };
45 sink(. End);
46 | End => ()
47 }
48 );
49 | Push(value) when !state.ended =>
50 Rebel.MutableQueue.add(state.buffer, value);
51 if (!state.pulled) {
52 state.pulled = true;
53 state.sourceTalkback(. Pull);
54 state.notifierTalkback(. Pull);
55 } else {
56 state.pulled = false;
57 };
58 | Push(_) => ()
59 | End when !state.ended =>
60 state.ended = true;
61 state.notifierTalkback(. Close);
62 if (Rebel.MutableQueue.size(state.buffer) > 0) {
63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
64 };
65 sink(. End);
66 | End => ()
67 }
68 );
69
70 sink(.
71 Start(
72 (. signal) =>
73 if (!state.ended) {
74 switch (signal) {
75 | Close =>
76 state.ended = true;
77 state.sourceTalkback(. Close);
78 state.notifierTalkback(. Close);
79 | Pull when !state.pulled =>
80 state.pulled = true;
81 state.sourceTalkback(. Pull);
82 state.notifierTalkback(. Pull);
83 | Pull => ()
84 };
85 },
86 ),
87 );
88 })
89 );
90
91type combineStateT('a, 'b) = {
92 mutable talkbackA: (. talkbackT) => unit,
93 mutable talkbackB: (. talkbackT) => unit,
94 mutable lastValA: option('a),
95 mutable lastValB: option('b),
96 mutable gotSignal: bool,
97 mutable endCounter: int,
98 mutable ended: bool,
99};
100
101[@genType]
102let combine =
103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
104 curry(sink => {
105 let state = {
106 talkbackA: talkbackPlaceholder,
107 talkbackB: talkbackPlaceholder,
108 lastValA: None,
109 lastValB: None,
110 gotSignal: false,
111 endCounter: 0,
112 ended: false,
113 };
114
115 sourceA((. signal) =>
116 switch (signal, state.lastValB) {
117 | (Start(tb), _) => state.talkbackA = tb
118 | (Push(a), None) =>
119 state.lastValA = Some(a);
120 if (!state.gotSignal) {
121 state.talkbackB(. Pull);
122 } else {
123 state.gotSignal = false;
124 };
125 | (Push(a), Some(b)) when !state.ended =>
126 state.lastValA = Some(a);
127 state.gotSignal = false;
128 sink(. Push((a, b)));
129 | (End, _) when state.endCounter < 1 =>
130 state.endCounter = state.endCounter + 1
131 | (End, _) when !state.ended =>
132 state.ended = true;
133 sink(. End);
134 | _ => ()
135 }
136 );
137
138 sourceB((. signal) =>
139 switch (signal, state.lastValA) {
140 | (Start(tb), _) => state.talkbackB = tb
141 | (Push(b), None) =>
142 state.lastValB = Some(b);
143 if (!state.gotSignal) {
144 state.talkbackA(. Pull);
145 } else {
146 state.gotSignal = false;
147 };
148 | (Push(b), Some(a)) when !state.ended =>
149 state.lastValB = Some(b);
150 state.gotSignal = false;
151 sink(. Push((a, b)));
152 | (End, _) when state.endCounter < 1 =>
153 state.endCounter = state.endCounter + 1
154 | (End, _) when !state.ended =>
155 state.ended = true;
156 sink(. End);
157 | _ => ()
158 }
159 );
160
161 sink(.
162 Start(
163 (. signal) =>
164 if (!state.ended) {
165 switch (signal) {
166 | Close =>
167 state.ended = true;
168 state.talkbackA(. Close);
169 state.talkbackB(. Close);
170 | Pull when !state.gotSignal =>
171 state.gotSignal = true;
172 state.talkbackA(. signal);
173 state.talkbackB(. signal);
174 | Pull => ()
175 };
176 },
177 ),
178 );
179 });
180
181type concatMapStateT('a) = {
182 inputQueue: Rebel.MutableQueue.t('a),
183 mutable outerTalkback: (. talkbackT) => unit,
184 mutable outerPulled: bool,
185 mutable innerTalkback: (. talkbackT) => unit,
186 mutable innerActive: bool,
187 mutable innerPulled: bool,
188 mutable ended: bool,
189};
190
191[@genType]
192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
193 curry(source =>
194 curry(sink => {
195 let state: concatMapStateT('a) = {
196 inputQueue: Rebel.MutableQueue.make(),
197 outerTalkback: talkbackPlaceholder,
198 outerPulled: false,
199 innerTalkback: talkbackPlaceholder,
200 innerActive: false,
201 innerPulled: false,
202 ended: false,
203 };
204
205 let rec applyInnerSource = innerSource =>
206 innerSource((. signal) =>
207 switch (signal) {
208 | Start(tb) =>
209 state.innerActive = true;
210 state.innerTalkback = tb;
211 state.innerPulled = false;
212 tb(. Pull);
213 | Push(_) when state.innerActive =>
214 sink(. signal);
215 if (!state.innerPulled) {
216 state.innerTalkback(. Pull);
217 } else {
218 state.innerPulled = false;
219 };
220 | Push(_) => ()
221 | End when state.innerActive =>
222 state.innerActive = false;
223 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
224 | Some(input) => applyInnerSource(f(. input))
225 | None when state.ended => sink(. End)
226 | None when !state.outerPulled =>
227 state.outerPulled = true;
228 state.outerTalkback(. Pull);
229 | None => ()
230 };
231 | End => ()
232 }
233 );
234
235 source((. signal) =>
236 switch (signal) {
237 | Start(tb) => state.outerTalkback = tb
238 | Push(x) when !state.ended =>
239 state.outerPulled = false;
240 if (state.innerActive) {
241 Rebel.MutableQueue.add(state.inputQueue, x);
242 } else {
243 applyInnerSource(f(. x));
244 };
245 | Push(_) => ()
246 | End when !state.ended =>
247 state.ended = true;
248 if (!state.innerActive
249 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
250 sink(. End);
251 };
252 | End => ()
253 }
254 );
255
256 sink(.
257 Start(
258 (. signal) =>
259 switch (signal) {
260 | Pull =>
261 if (!state.ended && !state.outerPulled) {
262 state.outerPulled = true;
263 state.outerTalkback(. Pull);
264 };
265 if (state.innerActive && !state.innerPulled) {
266 state.innerPulled = true;
267 state.innerTalkback(. Pull);
268 };
269 | Close =>
270 if (!state.ended) {
271 state.ended = true;
272 state.outerTalkback(. Close);
273 };
274 if (state.innerActive) {
275 state.innerActive = false;
276 state.innerTalkback(. Close);
277 };
278 },
279 ),
280 );
281 })
282 );
283
284[@genType]
285let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
286 concatMap((. x) => x, source);
287
288[@genType]
289let concat = (sources: array(sourceT('a))): sourceT('a) =>
290 concatMap((. x) => x, Wonka_sources.fromArray(sources));
291
292[@genType]
293let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
294 curry(source =>
295 curry(sink => {
296 let talkback = ref(talkbackPlaceholder);
297
298 source((. signal) =>
299 switch (signal) {
300 | Start(tb) =>
301 talkback := tb;
302 sink(. signal);
303 | Push(x) when !f(. x) => talkback^(. Pull)
304 | _ => sink(. signal)
305 }
306 );
307 })
308 );
309
310[@genType]
311let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
312 curry(source =>
313 curry(sink =>
314 source((. signal) =>
315 sink(.
316 /* The signal needs to be recreated for genType to generate
317 the correct generics during codegen */
318 switch (signal) {
319 | Start(x) => Start(x)
320 | Push(x) => Push(f(. x))
321 | End => End
322 },
323 )
324 )
325 )
326 );
327
328type mergeMapStateT = {
329 mutable outerTalkback: (. talkbackT) => unit,
330 mutable outerPulled: bool,
331 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
332 mutable ended: bool,
333};
334
335[@genType]
336let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
337 curry(source =>
338 curry(sink => {
339 let state: mergeMapStateT = {
340 outerTalkback: talkbackPlaceholder,
341 outerPulled: false,
342 innerTalkbacks: Rebel.Array.makeEmpty(),
343 ended: false,
344 };
345
346 let applyInnerSource = innerSource => {
347 let talkback = ref(talkbackPlaceholder);
348
349 innerSource((. signal) =>
350 switch (signal) {
351 | Start(tb) =>
352 talkback := tb;
353 state.innerTalkbacks =
354 Rebel.Array.append(state.innerTalkbacks, tb);
355 tb(. Pull);
356 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
357 sink(. Push(x));
358 talkback^(. Pull);
359 | Push(_) => ()
360 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
361 state.innerTalkbacks =
362 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
363 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0;
364 if (state.ended && exhausted) {
365 sink(. End);
366 } else if (!state.outerPulled && exhausted) {
367 state.outerPulled = true;
368 state.outerTalkback(. Pull);
369 };
370 | End => ()
371 }
372 );
373 };
374
375 source((. signal) =>
376 switch (signal) {
377 | Start(tb) => state.outerTalkback = tb
378 | Push(x) when !state.ended =>
379 state.outerPulled = false;
380 applyInnerSource(f(. x));
381 if (!state.outerPulled) {
382 state.outerPulled = true;
383 state.outerTalkback(. Pull);
384 };
385 | Push(_) => ()
386 | End when !state.ended =>
387 state.ended = true;
388 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
389 sink(. End);
390 };
391 | End => ()
392 }
393 );
394
395 sink(.
396 Start(
397 (. signal) =>
398 switch (signal) {
399 | Close =>
400 if (!state.ended) {
401 state.ended = true;
402 state.outerTalkback(. signal);
403 };
404
405 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal));
406 state.innerTalkbacks = Rebel.Array.makeEmpty();
407 | Pull =>
408 if (!state.outerPulled && !state.ended) {
409 state.outerPulled = true;
410 state.outerTalkback(. Pull);
411 } else {
412 state.outerPulled = false;
413 };
414
415 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
416 },
417 ),
418 );
419 })
420 );
421
422[@genType]
423let merge = (sources: array(sourceT('a))): sourceT('a) =>
424 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
425
426[@genType]
427let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
428 mergeMap((. x) => x, source);
429
430[@genType]
431let flatten = mergeAll;
432
433[@genType]
434let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
435 curry(source =>
436 curry(sink => {
437 let ended = ref(false);
438 source((. signal) =>
439 switch (signal) {
440 | Start(talkback) =>
441 sink(.
442 Start(
443 (. signal) =>
444 if (! ended^) {
445 switch (signal) {
446 | Pull => talkback(. signal)
447 | Close =>
448 ended := true;
449 talkback(. signal);
450 f(.);
451 };
452 },
453 ),
454 )
455 | Push(_) when ! ended^ => sink(. signal)
456 | Push(_) => ()
457 | End when ! ended^ =>
458 ended := true;
459 sink(. signal);
460 f(.);
461 | End => ()
462 }
463 );
464 })
465 );
466
467[@genType]
468let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
469 curry(source =>
470 curry(sink => {
471 let ended = ref(false);
472 source((. signal) => {
473 switch (signal) {
474 | Start(talkback) =>
475 sink(.
476 Start(
477 (. signal) =>
478 if (! ended^) {
479 switch (signal) {
480 | Pull => talkback(. signal)
481 | Close =>
482 ended := true;
483 talkback(. signal);
484 };
485 },
486 ),
487 )
488 | Push(x) when ! ended^ =>
489 f(. x);
490 sink(. signal);
491 | Push(_) => ()
492 | End when ! ended^ =>
493 ended := true;
494 sink(. signal);
495 | End => ()
496 }
497 });
498 })
499 );
500
501[@genType]
502let tap = onPush;
503
504[@genType]
505let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
506 curry(source =>
507 curry(sink =>
508 source((. signal) =>
509 switch (signal) {
510 | Start(_) =>
511 sink(. signal);
512 f(.);
513 | _ => sink(. signal)
514 }
515 )
516 )
517 );
518
519type sampleStateT('a) = {
520 mutable sourceTalkback: (. talkbackT) => unit,
521 mutable notifierTalkback: (. talkbackT) => unit,
522 mutable value: option('a),
523 mutable pulled: bool,
524 mutable ended: bool,
525};
526
527[@genType]
528let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
529 curry(source =>
530 curry(sink => {
531 let state = {
532 sourceTalkback: talkbackPlaceholder,
533 notifierTalkback: talkbackPlaceholder,
534 value: None,
535 pulled: false,
536 ended: false,
537 };
538
539 source((. signal) =>
540 switch (signal) {
541 | Start(tb) => state.sourceTalkback = tb
542 | Push(x) =>
543 state.value = Some(x);
544 if (!state.pulled) {
545 state.pulled = true;
546 state.notifierTalkback(. Pull);
547 state.sourceTalkback(. Pull);
548 } else {
549 state.pulled = false;
550 };
551 | End when !state.ended =>
552 state.ended = true;
553 state.notifierTalkback(. Close);
554 sink(. End);
555 | End => ()
556 }
557 );
558
559 notifier((. signal) =>
560 switch (signal, state.value) {
561 | (Start(tb), _) => state.notifierTalkback = tb
562 | (End, _) when !state.ended =>
563 state.ended = true;
564 state.sourceTalkback(. Close);
565 sink(. End);
566 | (End, _) => ()
567 | (Push(_), Some(x)) when !state.ended =>
568 state.value = None;
569 sink(. Push(x));
570 | (Push(_), _) => ()
571 }
572 );
573
574 sink(.
575 Start(
576 (. signal) =>
577 if (!state.ended) {
578 switch (signal) {
579 | Pull when !state.pulled =>
580 state.pulled = true;
581 state.sourceTalkback(. Pull);
582 state.notifierTalkback(. Pull);
583 | Pull => ()
584 | Close =>
585 state.ended = true;
586 state.sourceTalkback(. Close);
587 state.notifierTalkback(. Close);
588 };
589 },
590 ),
591 );
592 })
593 );
594
595[@genType]
596let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
597 curry(source =>
598 curry(sink => {
599 let acc = ref(seed);
600
601 source((. signal) =>
602 sink(.
603 switch (signal) {
604 | Push(x) =>
605 acc := f(. acc^, x);
606 Push(acc^);
607 | Start(x) => Start(x)
608 | End => End
609 },
610 )
611 );
612 })
613 );
614
615type shareStateT('a) = {
616 mutable sinks: Rebel.Array.t(sinkT('a)),
617 mutable talkback: (. talkbackT) => unit,
618 mutable gotSignal: bool,
619};
620
621[@genType]
622let share = (source: sourceT('a)): sourceT('a) => {
623 let state = {
624 sinks: Rebel.Array.makeEmpty(),
625 talkback: talkbackPlaceholder,
626 gotSignal: false,
627 };
628
629 sink => {
630 state.sinks = Rebel.Array.append(state.sinks, sink);
631
632 if (Rebel.Array.size(state.sinks) === 1) {
633 source((. signal) =>
634 switch (signal) {
635 | Push(_) =>
636 state.gotSignal = false;
637 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
638 | Start(x) => state.talkback = x
639 | End =>
640 Rebel.Array.forEach(state.sinks, sink => sink(. End));
641 state.sinks = Rebel.Array.makeEmpty();
642 }
643 );
644 };
645
646 sink(.
647 Start(
648 (. signal) =>
649 switch (signal) {
650 | Close =>
651 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
652 if (Rebel.Array.size(state.sinks) === 0) {
653 state.talkback(. Close);
654 };
655 | Pull when !state.gotSignal =>
656 state.gotSignal = true;
657 state.talkback(. signal);
658 | Pull => ()
659 },
660 ),
661 );
662 };
663};
664
665type skipStateT = {
666 mutable talkback: (. talkbackT) => unit,
667 mutable rest: int,
668};
669
670[@genType]
671let skip = (wait: int): operatorT('a, 'a) =>
672 curry(source =>
673 curry(sink => {
674 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait};
675
676 source((. signal) =>
677 switch (signal) {
678 | Start(tb) =>
679 state.talkback = tb;
680 sink(. signal);
681 | Push(_) when state.rest > 0 =>
682 state.rest = state.rest - 1;
683 state.talkback(. Pull);
684 | _ => sink(. signal)
685 }
686 );
687 })
688 );
689
690type skipUntilStateT = {
691 mutable sourceTalkback: (. talkbackT) => unit,
692 mutable notifierTalkback: (. talkbackT) => unit,
693 mutable skip: bool,
694 mutable pulled: bool,
695 mutable ended: bool,
696};
697
698[@genType]
699let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
700 curry(source =>
701 curry(sink => {
702 let state: skipUntilStateT = {
703 sourceTalkback: talkbackPlaceholder,
704 notifierTalkback: talkbackPlaceholder,
705 skip: true,
706 pulled: false,
707 ended: false,
708 };
709
710 source((. signal) =>
711 switch (signal) {
712 | Start(tb) =>
713 state.sourceTalkback = tb;
714
715 notifier((. signal) =>
716 switch (signal) {
717 | Start(innerTb) =>
718 state.notifierTalkback = innerTb;
719 innerTb(. Pull);
720 | Push(_) =>
721 state.skip = false;
722 state.notifierTalkback(. Close);
723 | End when state.skip =>
724 state.ended = true;
725 state.sourceTalkback(. Close);
726 | End => ()
727 }
728 );
729 | Push(_) when !state.skip && !state.ended =>
730 state.pulled = false;
731 sink(. signal);
732 | Push(_) when !state.pulled =>
733 state.pulled = true;
734 state.sourceTalkback(. Pull);
735 state.notifierTalkback(. Pull);
736 | Push(_) => state.pulled = false
737 | End =>
738 if (state.skip) {
739 state.notifierTalkback(. Close);
740 };
741 state.ended = true;
742 sink(. End);
743 }
744 );
745
746 sink(.
747 Start(
748 (. signal) =>
749 if (!state.ended) {
750 switch (signal) {
751 | Close =>
752 state.ended = true;
753 state.sourceTalkback(. Close);
754 if (state.skip) {
755 state.notifierTalkback(. Close);
756 };
757 | Pull when !state.pulled =>
758 state.pulled = true;
759 if (state.skip) {
760 state.notifierTalkback(. Pull);
761 };
762 state.sourceTalkback(. Pull);
763 | Pull => ()
764 };
765 },
766 ),
767 );
768 })
769 );
770
771type skipWhileStateT = {
772 mutable talkback: (. talkbackT) => unit,
773 mutable skip: bool,
774};
775
776[@genType]
777let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
778 curry(source =>
779 curry(sink => {
780 let state: skipWhileStateT = {
781 talkback: talkbackPlaceholder,
782 skip: true,
783 };
784
785 source((. signal) =>
786 switch (signal) {
787 | Start(tb) =>
788 state.talkback = tb;
789 sink(. signal);
790 | Push(x) when state.skip =>
791 if (f(. x)) {
792 state.talkback(. Pull);
793 } else {
794 state.skip = false;
795 sink(. signal);
796 }
797 | _ => sink(. signal)
798 }
799 );
800 })
801 );
802
803type switchMapStateT('a) = {
804 mutable outerTalkback: (. talkbackT) => unit,
805 mutable outerPulled: bool,
806 mutable innerTalkback: (. talkbackT) => unit,
807 mutable innerActive: bool,
808 mutable innerPulled: bool,
809 mutable ended: bool,
810};
811
812[@genType]
813let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
814 curry(source =>
815 curry(sink => {
816 let state: switchMapStateT('a) = {
817 outerTalkback: talkbackPlaceholder,
818 outerPulled: false,
819 innerTalkback: talkbackPlaceholder,
820 innerActive: false,
821 innerPulled: false,
822 ended: false,
823 };
824
825 let applyInnerSource = innerSource =>
826 innerSource((. signal) =>
827 switch (signal) {
828 | Start(tb) =>
829 state.innerActive = true;
830 state.innerTalkback = tb;
831 state.innerPulled = false;
832 tb(. Pull);
833 | Push(_) when state.innerActive =>
834 sink(. signal);
835 if (!state.innerPulled) {
836 state.innerTalkback(. Pull);
837 } else {
838 state.innerPulled = false;
839 };
840 | Push(_) => ()
841 | End when state.innerActive =>
842 state.innerActive = false;
843 if (state.ended) {
844 sink(. signal);
845 } else if (!state.outerPulled) {
846 state.outerPulled = true;
847 state.outerTalkback(. Pull);
848 };
849 | End => ()
850 }
851 );
852
853 source((. signal) =>
854 switch (signal) {
855 | Start(tb) => state.outerTalkback = tb
856 | Push(x) when !state.ended =>
857 if (state.innerActive) {
858 state.innerTalkback(. Close);
859 state.innerTalkback = talkbackPlaceholder;
860 };
861
862 if (!state.outerPulled) {
863 state.outerPulled = true;
864 state.outerTalkback(. Pull);
865 } else {
866 state.outerPulled = false;
867 };
868
869 applyInnerSource(f(. x));
870 | Push(_) => ()
871 | End when !state.ended =>
872 state.ended = true;
873 if (!state.innerActive) {
874 sink(. End);
875 };
876 | End => ()
877 }
878 );
879
880 sink(.
881 Start(
882 (. signal) =>
883 switch (signal) {
884 | Pull =>
885 if (!state.ended && !state.outerPulled) {
886 state.outerPulled = true;
887 state.outerTalkback(. Pull);
888 };
889 if (state.innerActive && !state.innerPulled) {
890 state.innerPulled = true;
891 state.innerTalkback(. Pull);
892 };
893 | Close =>
894 if (!state.ended) {
895 state.ended = true;
896 state.outerTalkback(. Close);
897 };
898 if (state.innerActive) {
899 state.innerActive = false;
900 state.innerTalkback(. Close);
901 };
902 },
903 ),
904 );
905 })
906 );
907
908[@genType]
909let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
910 switchMap((. x) => x, source);
911
912type takeStateT = {
913 mutable ended: bool,
914 mutable taken: int,
915 mutable talkback: (. talkbackT) => unit,
916};
917
918[@genType]
919let take = (max: int): operatorT('a, 'a) =>
920 curry(source =>
921 curry(sink => {
922 let state: takeStateT = {
923 ended: false,
924 taken: 0,
925 talkback: talkbackPlaceholder,
926 };
927
928 source((. signal) =>
929 switch (signal) {
930 | Start(tb) when max <= 0 =>
931 state.ended = true;
932 sink(. End);
933 tb(. Close);
934 | Start(tb) => state.talkback = tb
935 | Push(_) when state.taken < max && !state.ended =>
936 state.taken = state.taken + 1;
937 sink(. signal);
938 if (!state.ended && state.taken >= max) {
939 state.ended = true;
940 sink(. End);
941 state.talkback(. Close);
942 };
943 | Push(_) => ()
944 | End when !state.ended =>
945 state.ended = true;
946 sink(. End);
947 | End => ()
948 }
949 );
950
951 sink(.
952 Start(
953 (. signal) =>
954 if (!state.ended) {
955 switch (signal) {
956 | Pull when state.taken < max => state.talkback(. Pull)
957 | Pull => ()
958 | Close =>
959 state.ended = true;
960 state.talkback(. Close);
961 };
962 },
963 ),
964 );
965 })
966 );
967
968[@genType]
969let takeLast = (max: int): operatorT('a, 'a) =>
970 curry(source =>
971 curry(sink => {
972 open Rebel;
973 let talkback = ref(talkbackPlaceholder);
974 let queue = MutableQueue.make();
975
976 source((. signal) =>
977 switch (signal) {
978 | Start(tb) when max <= 0 =>
979 tb(. Close);
980 Wonka_sources.empty(sink);
981 | Start(tb) =>
982 talkback := tb;
983 tb(. Pull);
984 | Push(x) =>
985 let size = MutableQueue.size(queue);
986 if (size >= max && max > 0) {
987 ignore(MutableQueue.pop(queue));
988 };
989
990 MutableQueue.add(queue, x);
991 talkback^(. Pull);
992 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
993 }
994 );
995 })
996 );
997
998type takeUntilStateT = {
999 mutable ended: bool,
1000 mutable sourceTalkback: (. talkbackT) => unit,
1001 mutable notifierTalkback: (. talkbackT) => unit,
1002};
1003
1004[@genType]
1005let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
1006 curry(source =>
1007 curry(sink => {
1008 let state: takeUntilStateT = {
1009 ended: false,
1010 sourceTalkback: talkbackPlaceholder,
1011 notifierTalkback: talkbackPlaceholder,
1012 };
1013
1014 source((. signal) =>
1015 switch (signal) {
1016 | Start(tb) =>
1017 state.sourceTalkback = tb;
1018
1019 notifier((. signal) =>
1020 switch (signal) {
1021 | Start(innerTb) =>
1022 state.notifierTalkback = innerTb;
1023 innerTb(. Pull);
1024 | Push(_) =>
1025 state.ended = true;
1026 state.sourceTalkback(. Close);
1027 sink(. End);
1028 | End => ()
1029 }
1030 );
1031 | End when !state.ended =>
1032 state.ended = true;
1033 state.notifierTalkback(. Close);
1034 sink(. End);
1035 | End => ()
1036 | Push(_) when !state.ended => sink(. signal)
1037 | Push(_) => ()
1038 }
1039 );
1040
1041 sink(.
1042 Start(
1043 (. signal) =>
1044 if (!state.ended) {
1045 switch (signal) {
1046 | Close =>
1047 state.ended = true;
1048 state.sourceTalkback(. Close);
1049 state.notifierTalkback(. Close);
1050 | Pull => state.sourceTalkback(. Pull)
1051 };
1052 },
1053 ),
1054 );
1055 })
1056 );
1057
1058type takeWhileStateT = {
1059 mutable talkback: (. talkbackT) => unit,
1060 mutable ended: bool,
1061};
1062
1063[@genType]
1064let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
1065 curry(source =>
1066 curry(sink => {
1067 let state: takeWhileStateT = {
1068 talkback: talkbackPlaceholder,
1069 ended: false,
1070 };
1071
1072 source((. signal) =>
1073 switch (signal) {
1074 | Start(tb) =>
1075 state.talkback = tb;
1076 sink(. signal);
1077 | End when !state.ended =>
1078 state.ended = true;
1079 sink(. End);
1080 | End => ()
1081 | Push(x) when !state.ended =>
1082 if (!f(. x)) {
1083 state.ended = true;
1084 sink(. End);
1085 state.talkback(. Close);
1086 } else {
1087 sink(. signal);
1088 }
1089 | Push(_) => ()
1090 }
1091 );
1092 })
1093 );