1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable pulled: bool,
9 mutable ended: bool,
10};
11
12[@genType]
13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
14 curry(source =>
15 curry(sink => {
16 let state = {
17 buffer: Rebel.MutableQueue.make(),
18 sourceTalkback: talkbackPlaceholder,
19 notifierTalkback: talkbackPlaceholder,
20 pulled: false,
21 ended: false,
22 };
23
24 source((. signal) =>
25 switch (signal) {
26 | Start(tb) =>
27 state.sourceTalkback = tb;
28
29 notifier((. signal) =>
30 switch (signal) {
31 | Start(tb) => state.notifierTalkback = tb
32 | Push(_) when !state.ended =>
33 if (Rebel.MutableQueue.size(state.buffer) > 0) {
34 let buffer = state.buffer;
35 state.buffer = Rebel.MutableQueue.make();
36 sink(. Push(Rebel.MutableQueue.toArray(buffer)));
37 }
38 | Push(_) => ()
39 | End when !state.ended =>
40 state.ended = true;
41 state.sourceTalkback(. Close);
42 if (Rebel.MutableQueue.size(state.buffer) > 0) {
43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
44 };
45 sink(. End);
46 | End => ()
47 }
48 );
49 | Push(value) when !state.ended =>
50 Rebel.MutableQueue.add(state.buffer, value);
51 if (!state.pulled) {
52 state.pulled = true;
53 state.sourceTalkback(. Pull);
54 state.notifierTalkback(. Pull);
55 } else {
56 state.pulled = false;
57 };
58 | Push(_) => ()
59 | End when !state.ended =>
60 state.ended = true;
61 state.notifierTalkback(. Close);
62 if (Rebel.MutableQueue.size(state.buffer) > 0) {
63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
64 };
65 sink(. End);
66 | End => ()
67 }
68 );
69
70 sink(.
71 Start(
72 (. signal) =>
73 if (!state.ended) {
74 switch (signal) {
75 | Close =>
76 state.ended = true;
77 state.sourceTalkback(. Close);
78 state.notifierTalkback(. Close);
79 | Pull when !state.pulled =>
80 state.pulled = true;
81 state.sourceTalkback(. Pull);
82 state.notifierTalkback(. Pull);
83 | Pull => ()
84 };
85 },
86 ),
87 );
88 })
89 );
90
91type combineStateT('a, 'b) = {
92 mutable talkbackA: (. talkbackT) => unit,
93 mutable talkbackB: (. talkbackT) => unit,
94 mutable lastValA: option('a),
95 mutable lastValB: option('b),
96 mutable gotSignal: bool,
97 mutable endCounter: int,
98 mutable ended: bool,
99};
100
101[@genType]
102let combine =
103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
104 curry(sink => {
105 let state = {
106 talkbackA: talkbackPlaceholder,
107 talkbackB: talkbackPlaceholder,
108 lastValA: None,
109 lastValB: None,
110 gotSignal: false,
111 endCounter: 0,
112 ended: false,
113 };
114
115 sourceA((. signal) =>
116 switch (signal, state.lastValB) {
117 | (Start(tb), _) => state.talkbackA = tb
118 | (Push(a), None) =>
119 state.lastValA = Some(a);
120 if (!state.gotSignal) {
121 state.talkbackB(. Pull);
122 } else {
123 state.gotSignal = false;
124 };
125 | (Push(a), Some(b)) when !state.ended =>
126 state.lastValA = Some(a);
127 state.gotSignal = false;
128 sink(. Push((a, b)));
129 | (End, _) when state.endCounter < 1 =>
130 state.endCounter = state.endCounter + 1
131 | (End, _) when !state.ended =>
132 state.ended = true;
133 sink(. End);
134 | _ => ()
135 }
136 );
137
138 sourceB((. signal) =>
139 switch (signal, state.lastValA) {
140 | (Start(tb), _) => state.talkbackB = tb
141 | (Push(b), None) =>
142 state.lastValB = Some(b);
143 if (!state.gotSignal) {
144 state.talkbackA(. Pull);
145 } else {
146 state.gotSignal = false;
147 };
148 | (Push(b), Some(a)) when !state.ended =>
149 state.lastValB = Some(b);
150 state.gotSignal = false;
151 sink(. Push((a, b)));
152 | (End, _) when state.endCounter < 1 =>
153 state.endCounter = state.endCounter + 1
154 | (End, _) when !state.ended =>
155 state.ended = true;
156 sink(. End);
157 | _ => ()
158 }
159 );
160
161 sink(.
162 Start(
163 (. signal) =>
164 if (!state.ended) {
165 switch (signal) {
166 | Close =>
167 state.ended = true;
168 state.talkbackA(. Close);
169 state.talkbackB(. Close);
170 | Pull when !state.gotSignal =>
171 state.gotSignal = true;
172 state.talkbackA(. signal);
173 state.talkbackB(. signal);
174 | Pull => ()
175 };
176 },
177 ),
178 );
179 });
180
181type concatMapStateT('a) = {
182 inputQueue: Rebel.MutableQueue.t('a),
183 mutable outerTalkback: (. talkbackT) => unit,
184 mutable outerPulled: bool,
185 mutable innerTalkback: (. talkbackT) => unit,
186 mutable innerActive: bool,
187 mutable innerPulled: bool,
188 mutable ended: bool,
189};
190
191[@genType]
192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
193 curry(source =>
194 curry(sink => {
195 let state: concatMapStateT('a) = {
196 inputQueue: Rebel.MutableQueue.make(),
197 outerTalkback: talkbackPlaceholder,
198 outerPulled: false,
199 innerTalkback: talkbackPlaceholder,
200 innerActive: false,
201 innerPulled: false,
202 ended: false,
203 };
204
205 let rec applyInnerSource = innerSource => {
206 state.innerActive = true;
207 innerSource((. signal) =>
208 switch (signal) {
209 | Start(tb) =>
210 state.innerTalkback = tb;
211 state.innerPulled = false;
212 tb(. Pull);
213 | Push(_) when state.innerActive =>
214 sink(. signal);
215 if (!state.innerPulled) {
216 state.innerTalkback(. Pull);
217 } else {
218 state.innerPulled = false;
219 };
220 | Push(_) => ()
221 | End when state.innerActive =>
222 state.innerActive = false;
223 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
224 | Some(input) => applyInnerSource(f(. input))
225 | None when state.ended => sink(. End)
226 | None when !state.outerPulled =>
227 state.outerPulled = true;
228 state.outerTalkback(. Pull);
229 | None => ()
230 };
231 | End => ()
232 }
233 );
234 };
235
236 source((. signal) =>
237 switch (signal) {
238 | Start(tb) => state.outerTalkback = tb
239 | Push(x) when !state.ended =>
240 state.outerPulled = false;
241 if (state.innerActive) {
242 Rebel.MutableQueue.add(state.inputQueue, x);
243 } else {
244 applyInnerSource(f(. x));
245 };
246 | Push(_) => ()
247 | End when !state.ended =>
248 state.ended = true;
249 if (!state.innerActive
250 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
251 sink(. End);
252 };
253 | End => ()
254 }
255 );
256
257 sink(.
258 Start(
259 (. signal) =>
260 switch (signal) {
261 | Pull =>
262 if (!state.ended && !state.outerPulled) {
263 state.outerPulled = true;
264 state.outerTalkback(. Pull);
265 };
266 if (state.innerActive && !state.innerPulled) {
267 state.innerPulled = true;
268 state.innerTalkback(. Pull);
269 };
270 | Close =>
271 if (!state.ended) {
272 state.ended = true;
273 state.outerTalkback(. Close);
274 };
275 if (state.innerActive) {
276 state.innerActive = false;
277 state.innerTalkback(. Close);
278 };
279 },
280 ),
281 );
282 })
283 );
284
285[@genType]
286let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
287 concatMap((. x) => x, source);
288
289[@genType]
290let concat = (sources: array(sourceT('a))): sourceT('a) =>
291 concatMap((. x) => x, Wonka_sources.fromArray(sources));
292
293[@genType]
294let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
295 curry(source =>
296 curry(sink => {
297 let talkback = ref(talkbackPlaceholder);
298
299 source((. signal) =>
300 switch (signal) {
301 | Start(tb) =>
302 talkback := tb;
303 sink(. signal);
304 | Push(x) when !f(. x) => talkback^(. Pull)
305 | _ => sink(. signal)
306 }
307 );
308 })
309 );
310
311[@genType]
312let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
313 curry(source =>
314 curry(sink =>
315 source((. signal) =>
316 sink(.
317 /* The signal needs to be recreated for genType to generate
318 the correct generics during codegen */
319 switch (signal) {
320 | Start(x) => Start(x)
321 | Push(x) => Push(f(. x))
322 | End => End
323 },
324 )
325 )
326 )
327 );
328
329type mergeMapStateT = {
330 mutable outerTalkback: (. talkbackT) => unit,
331 mutable outerPulled: bool,
332 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
333 mutable ended: bool,
334};
335
336[@genType]
337let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
338 curry(source =>
339 curry(sink => {
340 let state: mergeMapStateT = {
341 outerTalkback: talkbackPlaceholder,
342 outerPulled: false,
343 innerTalkbacks: Rebel.Array.makeEmpty(),
344 ended: false,
345 };
346
347 let applyInnerSource = innerSource => {
348 let talkback = ref(talkbackPlaceholder);
349
350 innerSource((. signal) =>
351 switch (signal) {
352 | Start(tb) =>
353 talkback := tb;
354 state.innerTalkbacks =
355 Rebel.Array.append(state.innerTalkbacks, tb);
356 tb(. Pull);
357 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
358 sink(. Push(x));
359 talkback^(. Pull);
360 | Push(_) => ()
361 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
362 state.innerTalkbacks =
363 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
364 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0;
365 if (state.ended && exhausted) {
366 sink(. End);
367 } else if (!state.outerPulled && exhausted) {
368 state.outerPulled = true;
369 state.outerTalkback(. Pull);
370 };
371 | End => ()
372 }
373 );
374 };
375
376 source((. signal) =>
377 switch (signal) {
378 | Start(tb) => state.outerTalkback = tb
379 | Push(x) when !state.ended =>
380 state.outerPulled = false;
381 applyInnerSource(f(. x));
382 if (!state.outerPulled) {
383 state.outerPulled = true;
384 state.outerTalkback(. Pull);
385 };
386 | Push(_) => ()
387 | End when !state.ended =>
388 state.ended = true;
389 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
390 sink(. End);
391 };
392 | End => ()
393 }
394 );
395
396 sink(.
397 Start(
398 (. signal) =>
399 switch (signal) {
400 | Close =>
401 if (!state.ended) {
402 state.ended = true;
403 state.outerTalkback(. signal);
404 };
405
406 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal));
407 state.innerTalkbacks = Rebel.Array.makeEmpty();
408 | Pull =>
409 if (!state.outerPulled && !state.ended) {
410 state.outerPulled = true;
411 state.outerTalkback(. Pull);
412 } else {
413 state.outerPulled = false;
414 };
415
416 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
417 },
418 ),
419 );
420 })
421 );
422
423[@genType]
424let merge = (sources: array(sourceT('a))): sourceT('a) =>
425 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
426
427[@genType]
428let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
429 mergeMap((. x) => x, source);
430
431[@genType]
432let flatten = mergeAll;
433
434[@genType]
435let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
436 curry(source =>
437 curry(sink => {
438 let ended = ref(false);
439 source((. signal) =>
440 switch (signal) {
441 | Start(talkback) =>
442 sink(.
443 Start(
444 (. signal) =>
445 if (! ended^) {
446 switch (signal) {
447 | Pull => talkback(. signal)
448 | Close =>
449 ended := true;
450 talkback(. signal);
451 f(.);
452 };
453 },
454 ),
455 )
456 | Push(_) when ! ended^ => sink(. signal)
457 | Push(_) => ()
458 | End when ! ended^ =>
459 ended := true;
460 sink(. signal);
461 f(.);
462 | End => ()
463 }
464 );
465 })
466 );
467
468[@genType]
469let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
470 curry(source =>
471 curry(sink => {
472 let ended = ref(false);
473 source((. signal) => {
474 switch (signal) {
475 | Start(talkback) =>
476 sink(.
477 Start(
478 (. signal) =>
479 if (! ended^) {
480 switch (signal) {
481 | Pull => talkback(. signal)
482 | Close =>
483 ended := true;
484 talkback(. signal);
485 };
486 },
487 ),
488 )
489 | Push(x) when ! ended^ =>
490 f(. x);
491 sink(. signal);
492 | Push(_) => ()
493 | End when ! ended^ =>
494 ended := true;
495 sink(. signal);
496 | End => ()
497 }
498 });
499 })
500 );
501
502[@genType]
503let tap = onPush;
504
505[@genType]
506let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
507 curry(source =>
508 curry(sink =>
509 source((. signal) =>
510 switch (signal) {
511 | Start(_) =>
512 sink(. signal);
513 f(.);
514 | _ => sink(. signal)
515 }
516 )
517 )
518 );
519
520type sampleStateT('a) = {
521 mutable sourceTalkback: (. talkbackT) => unit,
522 mutable notifierTalkback: (. talkbackT) => unit,
523 mutable value: option('a),
524 mutable pulled: bool,
525 mutable ended: bool,
526};
527
528[@genType]
529let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
530 curry(source =>
531 curry(sink => {
532 let state = {
533 sourceTalkback: talkbackPlaceholder,
534 notifierTalkback: talkbackPlaceholder,
535 value: None,
536 pulled: false,
537 ended: false,
538 };
539
540 source((. signal) =>
541 switch (signal) {
542 | Start(tb) => state.sourceTalkback = tb
543 | Push(x) =>
544 state.value = Some(x);
545 if (!state.pulled) {
546 state.pulled = true;
547 state.notifierTalkback(. Pull);
548 state.sourceTalkback(. Pull);
549 } else {
550 state.pulled = false;
551 };
552 | End when !state.ended =>
553 state.ended = true;
554 state.notifierTalkback(. Close);
555 sink(. End);
556 | End => ()
557 }
558 );
559
560 notifier((. signal) =>
561 switch (signal, state.value) {
562 | (Start(tb), _) => state.notifierTalkback = tb
563 | (End, _) when !state.ended =>
564 state.ended = true;
565 state.sourceTalkback(. Close);
566 sink(. End);
567 | (End, _) => ()
568 | (Push(_), Some(x)) when !state.ended =>
569 state.value = None;
570 sink(. Push(x));
571 | (Push(_), _) => ()
572 }
573 );
574
575 sink(.
576 Start(
577 (. signal) =>
578 if (!state.ended) {
579 switch (signal) {
580 | Pull when !state.pulled =>
581 state.pulled = true;
582 state.sourceTalkback(. Pull);
583 state.notifierTalkback(. Pull);
584 | Pull => ()
585 | Close =>
586 state.ended = true;
587 state.sourceTalkback(. Close);
588 state.notifierTalkback(. Close);
589 };
590 },
591 ),
592 );
593 })
594 );
595
596[@genType]
597let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
598 curry(source =>
599 curry(sink => {
600 let acc = ref(seed);
601
602 source((. signal) =>
603 sink(.
604 switch (signal) {
605 | Push(x) =>
606 acc := f(. acc^, x);
607 Push(acc^);
608 | Start(x) => Start(x)
609 | End => End
610 },
611 )
612 );
613 })
614 );
615
616type shareStateT('a) = {
617 mutable sinks: Rebel.Array.t(sinkT('a)),
618 mutable talkback: (. talkbackT) => unit,
619 mutable gotSignal: bool,
620};
621
622[@genType]
623let share = (source: sourceT('a)): sourceT('a) => {
624 let state = {
625 sinks: Rebel.Array.makeEmpty(),
626 talkback: talkbackPlaceholder,
627 gotSignal: false,
628 };
629
630 sink => {
631 state.sinks = Rebel.Array.append(state.sinks, sink);
632
633 if (Rebel.Array.size(state.sinks) === 1) {
634 source((. signal) =>
635 switch (signal) {
636 | Push(_) =>
637 state.gotSignal = false;
638 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
639 | Start(x) => state.talkback = x
640 | End =>
641 Rebel.Array.forEach(state.sinks, sink => sink(. End));
642 state.sinks = Rebel.Array.makeEmpty();
643 }
644 );
645 };
646
647 sink(.
648 Start(
649 (. signal) =>
650 switch (signal) {
651 | Close =>
652 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
653 if (Rebel.Array.size(state.sinks) === 0) {
654 state.talkback(. Close);
655 };
656 | Pull when !state.gotSignal =>
657 state.gotSignal = true;
658 state.talkback(. signal);
659 | Pull => ()
660 },
661 ),
662 );
663 };
664};
665
666type skipStateT = {
667 mutable talkback: (. talkbackT) => unit,
668 mutable rest: int,
669};
670
671[@genType]
672let skip = (wait: int): operatorT('a, 'a) =>
673 curry(source =>
674 curry(sink => {
675 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait};
676
677 source((. signal) =>
678 switch (signal) {
679 | Start(tb) =>
680 state.talkback = tb;
681 sink(. signal);
682 | Push(_) when state.rest > 0 =>
683 state.rest = state.rest - 1;
684 state.talkback(. Pull);
685 | _ => sink(. signal)
686 }
687 );
688 })
689 );
690
691type skipUntilStateT = {
692 mutable sourceTalkback: (. talkbackT) => unit,
693 mutable notifierTalkback: (. talkbackT) => unit,
694 mutable skip: bool,
695 mutable pulled: bool,
696 mutable ended: bool,
697};
698
699[@genType]
700let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
701 curry(source =>
702 curry(sink => {
703 let state: skipUntilStateT = {
704 sourceTalkback: talkbackPlaceholder,
705 notifierTalkback: talkbackPlaceholder,
706 skip: true,
707 pulled: false,
708 ended: false,
709 };
710
711 source((. signal) =>
712 switch (signal) {
713 | Start(tb) =>
714 state.sourceTalkback = tb;
715
716 notifier((. signal) =>
717 switch (signal) {
718 | Start(innerTb) =>
719 state.notifierTalkback = innerTb;
720 innerTb(. Pull);
721 | Push(_) =>
722 state.skip = false;
723 state.notifierTalkback(. Close);
724 | End when state.skip =>
725 state.ended = true;
726 state.sourceTalkback(. Close);
727 | End => ()
728 }
729 );
730 | Push(_) when !state.skip && !state.ended =>
731 state.pulled = false;
732 sink(. signal);
733 | Push(_) when !state.pulled =>
734 state.pulled = true;
735 state.sourceTalkback(. Pull);
736 state.notifierTalkback(. Pull);
737 | Push(_) => state.pulled = false
738 | End =>
739 if (state.skip) {
740 state.notifierTalkback(. Close);
741 };
742 state.ended = true;
743 sink(. End);
744 }
745 );
746
747 sink(.
748 Start(
749 (. signal) =>
750 if (!state.ended) {
751 switch (signal) {
752 | Close =>
753 state.ended = true;
754 state.sourceTalkback(. Close);
755 if (state.skip) {
756 state.notifierTalkback(. Close);
757 };
758 | Pull when !state.pulled =>
759 state.pulled = true;
760 if (state.skip) {
761 state.notifierTalkback(. Pull);
762 };
763 state.sourceTalkback(. Pull);
764 | Pull => ()
765 };
766 },
767 ),
768 );
769 })
770 );
771
772type skipWhileStateT = {
773 mutable talkback: (. talkbackT) => unit,
774 mutable skip: bool,
775};
776
777[@genType]
778let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
779 curry(source =>
780 curry(sink => {
781 let state: skipWhileStateT = {
782 talkback: talkbackPlaceholder,
783 skip: true,
784 };
785
786 source((. signal) =>
787 switch (signal) {
788 | Start(tb) =>
789 state.talkback = tb;
790 sink(. signal);
791 | Push(x) when state.skip =>
792 if (f(. x)) {
793 state.talkback(. Pull);
794 } else {
795 state.skip = false;
796 sink(. signal);
797 }
798 | _ => sink(. signal)
799 }
800 );
801 })
802 );
803
804type switchMapStateT('a) = {
805 mutable outerTalkback: (. talkbackT) => unit,
806 mutable outerPulled: bool,
807 mutable innerTalkback: (. talkbackT) => unit,
808 mutable innerActive: bool,
809 mutable innerPulled: bool,
810 mutable ended: bool,
811};
812
813[@genType]
814let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
815 curry(source =>
816 curry(sink => {
817 let state: switchMapStateT('a) = {
818 outerTalkback: talkbackPlaceholder,
819 outerPulled: false,
820 innerTalkback: talkbackPlaceholder,
821 innerActive: false,
822 innerPulled: false,
823 ended: false,
824 };
825
826 let applyInnerSource = innerSource => {
827 state.innerActive = true;
828 innerSource((. signal) =>
829 if (state.innerActive) {
830 switch (signal) {
831 | Start(tb) =>
832 state.innerTalkback = tb;
833 state.innerPulled = false;
834 tb(. Pull);
835 | Push(_) =>
836 sink(. signal);
837 if (!state.innerPulled) {
838 state.innerTalkback(. Pull);
839 } else {
840 state.innerPulled = false;
841 };
842 | End =>
843 state.innerActive = false;
844 if (state.ended) {
845 sink(. signal);
846 } else if (!state.outerPulled) {
847 state.outerPulled = true;
848 state.outerTalkback(. Pull);
849 };
850 };
851 }
852 );
853 };
854
855 source((. signal) =>
856 switch (signal) {
857 | Start(tb) => state.outerTalkback = tb
858 | Push(x) when !state.ended =>
859 if (state.innerActive) {
860 state.innerTalkback(. Close);
861 state.innerTalkback = talkbackPlaceholder;
862 };
863
864 if (!state.outerPulled) {
865 state.outerPulled = true;
866 state.outerTalkback(. Pull);
867 } else {
868 state.outerPulled = false;
869 };
870
871 applyInnerSource(f(. x));
872 | Push(_) => ()
873 | End when !state.ended =>
874 state.ended = true;
875 if (!state.innerActive) {
876 sink(. End);
877 };
878 | End => ()
879 }
880 );
881
882 sink(.
883 Start(
884 (. signal) =>
885 switch (signal) {
886 | Pull =>
887 if (!state.ended && !state.outerPulled) {
888 state.outerPulled = true;
889 state.outerTalkback(. Pull);
890 };
891 if (state.innerActive && !state.innerPulled) {
892 state.innerPulled = true;
893 state.innerTalkback(. Pull);
894 };
895 | Close =>
896 if (!state.ended) {
897 state.ended = true;
898 state.outerTalkback(. Close);
899 };
900 if (state.innerActive) {
901 state.innerActive = false;
902 state.innerTalkback(. Close);
903 };
904 },
905 ),
906 );
907 })
908 );
909
910[@genType]
911let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
912 switchMap((. x) => x, source);
913
914type takeStateT = {
915 mutable ended: bool,
916 mutable taken: int,
917 mutable talkback: (. talkbackT) => unit,
918};
919
920[@genType]
921let take = (max: int): operatorT('a, 'a) =>
922 curry(source =>
923 curry(sink => {
924 let state: takeStateT = {
925 ended: false,
926 taken: 0,
927 talkback: talkbackPlaceholder,
928 };
929
930 source((. signal) =>
931 switch (signal) {
932 | Start(tb) when max <= 0 =>
933 state.ended = true;
934 sink(. End);
935 tb(. Close);
936 | Start(tb) => state.talkback = tb
937 | Push(_) when state.taken < max && !state.ended =>
938 state.taken = state.taken + 1;
939 sink(. signal);
940 if (!state.ended && state.taken >= max) {
941 state.ended = true;
942 sink(. End);
943 state.talkback(. Close);
944 };
945 | Push(_) => ()
946 | End when !state.ended =>
947 state.ended = true;
948 sink(. End);
949 | End => ()
950 }
951 );
952
953 sink(.
954 Start(
955 (. signal) =>
956 if (!state.ended) {
957 switch (signal) {
958 | Pull when state.taken < max => state.talkback(. Pull)
959 | Pull => ()
960 | Close =>
961 state.ended = true;
962 state.talkback(. Close);
963 };
964 },
965 ),
966 );
967 })
968 );
969
970[@genType]
971let takeLast = (max: int): operatorT('a, 'a) =>
972 curry(source =>
973 curry(sink => {
974 open Rebel;
975 let talkback = ref(talkbackPlaceholder);
976 let queue = MutableQueue.make();
977
978 source((. signal) =>
979 switch (signal) {
980 | Start(tb) when max <= 0 =>
981 tb(. Close);
982 Wonka_sources.empty(sink);
983 | Start(tb) =>
984 talkback := tb;
985 tb(. Pull);
986 | Push(x) =>
987 let size = MutableQueue.size(queue);
988 if (size >= max && max > 0) {
989 ignore(MutableQueue.pop(queue));
990 };
991
992 MutableQueue.add(queue, x);
993 talkback^(. Pull);
994 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
995 }
996 );
997 })
998 );
999
1000type takeUntilStateT = {
1001 mutable ended: bool,
1002 mutable sourceTalkback: (. talkbackT) => unit,
1003 mutable notifierTalkback: (. talkbackT) => unit,
1004};
1005
1006[@genType]
1007let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
1008 curry(source =>
1009 curry(sink => {
1010 let state: takeUntilStateT = {
1011 ended: false,
1012 sourceTalkback: talkbackPlaceholder,
1013 notifierTalkback: talkbackPlaceholder,
1014 };
1015
1016 source((. signal) =>
1017 switch (signal) {
1018 | Start(tb) =>
1019 state.sourceTalkback = tb;
1020
1021 notifier((. signal) =>
1022 switch (signal) {
1023 | Start(innerTb) =>
1024 state.notifierTalkback = innerTb;
1025 innerTb(. Pull);
1026 | Push(_) =>
1027 state.ended = true;
1028 state.sourceTalkback(. Close);
1029 sink(. End);
1030 | End => ()
1031 }
1032 );
1033 | End when !state.ended =>
1034 state.ended = true;
1035 state.notifierTalkback(. Close);
1036 sink(. End);
1037 | End => ()
1038 | Push(_) when !state.ended => sink(. signal)
1039 | Push(_) => ()
1040 }
1041 );
1042
1043 sink(.
1044 Start(
1045 (. signal) =>
1046 if (!state.ended) {
1047 switch (signal) {
1048 | Close =>
1049 state.ended = true;
1050 state.sourceTalkback(. Close);
1051 state.notifierTalkback(. Close);
1052 | Pull => state.sourceTalkback(. Pull)
1053 };
1054 },
1055 ),
1056 );
1057 })
1058 );
1059
1060type takeWhileStateT = {
1061 mutable talkback: (. talkbackT) => unit,
1062 mutable ended: bool,
1063};
1064
1065[@genType]
1066let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
1067 curry(source =>
1068 curry(sink => {
1069 let state: takeWhileStateT = {
1070 talkback: talkbackPlaceholder,
1071 ended: false,
1072 };
1073
1074 source((. signal) =>
1075 switch (signal) {
1076 | Start(tb) =>
1077 state.talkback = tb;
1078 sink(. signal);
1079 | End when !state.ended =>
1080 state.ended = true;
1081 sink(. End);
1082 | End => ()
1083 | Push(x) when !state.ended =>
1084 if (!f(. x)) {
1085 state.ended = true;
1086 sink(. End);
1087 state.talkback(. Close);
1088 } else {
1089 sink(. signal);
1090 }
1091 | Push(_) => ()
1092 }
1093 );
1094 })
1095 );