1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable ended: bool,
9};
10
11[@genType]
12let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
13 curry(source =>
14 curry(sink => {
15 let state = {
16 buffer: Rebel.MutableQueue.make(),
17 sourceTalkback: talkbackPlaceholder,
18 notifierTalkback: talkbackPlaceholder,
19 ended: false,
20 };
21
22 source((. signal) =>
23 switch (signal) {
24 | Start(tb) =>
25 state.sourceTalkback = tb;
26
27 notifier((. signal) =>
28 switch (signal) {
29 | Start(tb) =>
30 state.notifierTalkback = tb;
31 state.notifierTalkback(. Pull);
32 | Push(_) when !state.ended =>
33 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
34 state.buffer = Rebel.MutableQueue.make();
35 state.notifierTalkback(. Pull);
36 | Push(_) => ()
37 | End when !state.ended =>
38 state.ended = true;
39 state.sourceTalkback(. Close);
40 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
41 sink(. End);
42 | End => ()
43 }
44 );
45 | Push(value) when !state.ended =>
46 Rebel.MutableQueue.add(state.buffer, value);
47 state.sourceTalkback(. Pull);
48 | Push(_) => ()
49 | End when !state.ended =>
50 state.ended = true;
51 state.notifierTalkback(. Close);
52 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
53 sink(. End);
54 | End => ()
55 }
56 );
57
58 sink(.
59 Start(
60 (. signal) =>
61 if (!state.ended) {
62 switch (signal) {
63 | Close =>
64 state.ended = true;
65 state.sourceTalkback(. Close);
66 state.notifierTalkback(. Close);
67 | Pull => state.sourceTalkback(. Pull)
68 };
69 },
70 ),
71 );
72 })
73 );
74
75type combineStateT('a, 'b) = {
76 mutable talkbackA: (. talkbackT) => unit,
77 mutable talkbackB: (. talkbackT) => unit,
78 mutable lastValA: option('a),
79 mutable lastValB: option('b),
80 mutable gotSignal: bool,
81 mutable endCounter: int,
82 mutable ended: bool,
83};
84
85[@genType]
86let combine =
87 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
88 curry(sink => {
89 let state = {
90 talkbackA: talkbackPlaceholder,
91 talkbackB: talkbackPlaceholder,
92 lastValA: None,
93 lastValB: None,
94 gotSignal: false,
95 endCounter: 0,
96 ended: false,
97 };
98
99 sourceA((. signal) =>
100 switch (signal, state.lastValB) {
101 | (Start(tb), _) => state.talkbackA = tb
102 | (Push(a), None) =>
103 state.lastValA = Some(a);
104 state.gotSignal = false;
105 | (Push(a), Some(b)) when !state.ended =>
106 state.lastValA = Some(a);
107 state.gotSignal = false;
108 sink(. Push((a, b)));
109 | (End, _) when state.endCounter < 1 =>
110 state.endCounter = state.endCounter + 1
111 | (End, _) when !state.ended =>
112 state.ended = true;
113 sink(. End);
114 | _ => ()
115 }
116 );
117
118 sourceB((. signal) =>
119 switch (signal, state.lastValA) {
120 | (Start(tb), _) => state.talkbackB = tb
121 | (Push(b), None) =>
122 state.lastValB = Some(b);
123 state.gotSignal = false;
124 | (Push(b), Some(a)) when !state.ended =>
125 state.lastValB = Some(b);
126 state.gotSignal = false;
127 sink(. Push((a, b)));
128 | (End, _) when state.endCounter < 1 =>
129 state.endCounter = state.endCounter + 1
130 | (End, _) when !state.ended =>
131 state.ended = true;
132 sink(. End);
133 | _ => ()
134 }
135 );
136
137 sink(.
138 Start(
139 (. signal) =>
140 if (!state.ended) {
141 switch (signal) {
142 | Close =>
143 state.ended = true;
144 state.talkbackA(. Close);
145 state.talkbackB(. Close);
146 | Pull when !state.gotSignal =>
147 state.gotSignal = true;
148 state.talkbackA(. signal);
149 state.talkbackB(. signal);
150 | Pull => ()
151 };
152 },
153 ),
154 );
155 });
156
157type concatMapStateT('a) = {
158 inputQueue: Rebel.MutableQueue.t('a),
159 mutable outerTalkback: (. talkbackT) => unit,
160 mutable innerTalkback: (. talkbackT) => unit,
161 mutable innerActive: bool,
162 mutable closed: bool,
163 mutable ended: bool,
164};
165
166[@genType]
167let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
168 curry(source =>
169 curry(sink => {
170 let state: concatMapStateT('a) = {
171 inputQueue: Rebel.MutableQueue.make(),
172 outerTalkback: talkbackPlaceholder,
173 innerTalkback: talkbackPlaceholder,
174 innerActive: false,
175 closed: false,
176 ended: false,
177 };
178
179 let rec applyInnerSource = innerSource =>
180 innerSource((. signal) =>
181 switch (signal) {
182 | End =>
183 state.innerActive = false;
184 state.innerTalkback = talkbackPlaceholder;
185
186 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
187 | Some(input) => applyInnerSource(f(. input))
188 | None when state.ended => sink(. End)
189 | None => ()
190 };
191 | Start(tb) =>
192 state.innerActive = true;
193 state.innerTalkback = tb;
194 tb(. Pull);
195 | Push(x) when !state.closed =>
196 sink(. Push(x));
197 state.innerTalkback(. Pull);
198 | Push(_) => ()
199 }
200 );
201
202 source((. signal) =>
203 switch (signal) {
204 | End when !state.ended =>
205 state.ended = true;
206 if (!state.innerActive
207 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
208 sink(. End);
209 };
210 | End => ()
211 | Start(tb) =>
212 state.outerTalkback = tb;
213 tb(. Pull);
214 | Push(x) when !state.ended =>
215 if (state.innerActive) {
216 Rebel.MutableQueue.add(state.inputQueue, x);
217 } else {
218 applyInnerSource(f(. x));
219 };
220
221 state.outerTalkback(. Pull);
222 | Push(_) => ()
223 }
224 );
225
226 sink(.
227 Start(
228 (. signal) =>
229 switch (signal) {
230 | Pull =>
231 if (!state.ended) {
232 state.innerTalkback(. Pull);
233 }
234 | Close =>
235 state.innerTalkback(. Close);
236 if (!state.ended) {
237 state.ended = true;
238 state.closed = true;
239 state.outerTalkback(. Close);
240 state.innerTalkback = talkbackPlaceholder;
241 };
242 },
243 ),
244 );
245 })
246 );
247
248[@genType]
249let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
250 concatMap((. x) => x, source);
251
252[@genType]
253let concat = (sources: array(sourceT('a))): sourceT('a) =>
254 concatMap((. x) => x, Wonka_sources.fromArray(sources));
255
256[@genType]
257let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
258 curry(source =>
259 curry(sink =>
260 captureTalkback(source, (. signal, talkback) =>
261 switch (signal) {
262 | Push(x) when !f(. x) => talkback(. Pull)
263 | _ => sink(. signal)
264 }
265 )
266 )
267 );
268
269[@genType]
270let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
271 curry(source =>
272 curry(sink =>
273 source((. signal) =>
274 sink(.
275 switch (signal) {
276 | Start(x) => Start(x)
277 | Push(x) => Push(f(. x))
278 | End => End
279 },
280 )
281 )
282 )
283 );
284
285type mergeMapStateT = {
286 mutable outerTalkback: (. talkbackT) => unit,
287 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
288 mutable ended: bool,
289};
290
291[@genType]
292let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
293 curry(source =>
294 curry(sink => {
295 let state: mergeMapStateT = {
296 outerTalkback: talkbackPlaceholder,
297 innerTalkbacks: Rebel.Array.makeEmpty(),
298 ended: false,
299 };
300
301 let applyInnerSource = innerSource => {
302 let talkback = ref(talkbackPlaceholder);
303
304 innerSource((. signal) =>
305 switch (signal) {
306 | End =>
307 state.innerTalkbacks =
308 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
309 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
310 sink(. End);
311 };
312 | Start(tb) =>
313 talkback := tb;
314 state.innerTalkbacks =
315 Rebel.Array.append(state.innerTalkbacks, tb);
316 tb(. Pull);
317 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
318 sink(. Push(x));
319 talkback^(. Pull);
320 | Push(_) => ()
321 }
322 );
323 };
324
325 source((. signal) =>
326 switch (signal) {
327 | End when !state.ended =>
328 state.ended = true;
329 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
330 sink(. End);
331 };
332 | End => ()
333 | Start(tb) =>
334 state.outerTalkback = tb;
335 tb(. Pull);
336 | Push(x) when !state.ended =>
337 applyInnerSource(f(. x));
338 state.outerTalkback(. Pull);
339 | Push(_) => ()
340 }
341 );
342
343 sink(.
344 Start(
345 (. signal) =>
346 switch (signal) {
347 | Close =>
348 Rebel.Array.forEach(state.innerTalkbacks, talkback =>
349 talkback(. Close)
350 );
351 if (!state.ended) {
352 state.ended = true;
353 state.outerTalkback(. Close);
354 Rebel.Array.forEach(state.innerTalkbacks, talkback =>
355 talkback(. Close)
356 );
357 state.innerTalkbacks = Rebel.Array.makeEmpty();
358 };
359 | Pull when !state.ended =>
360 Rebel.Array.forEach(state.innerTalkbacks, talkback =>
361 talkback(. Pull)
362 )
363 | Pull => ()
364 },
365 ),
366 );
367 })
368 );
369
370[@genType]
371let merge = (sources: array(sourceT('a))): sourceT('a) =>
372 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
373
374[@genType]
375let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
376 mergeMap((. x) => x, source);
377
378[@genType]
379let flatten = mergeAll;
380
381[@genType]
382let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
383 curry(source =>
384 curry(sink => {
385 let ended = ref(false);
386
387 source((. signal) =>
388 switch (signal) {
389 | Start(talkback) =>
390 sink(.
391 Start(
392 (. signal) => {
393 switch (signal) {
394 | Close when ! ended^ =>
395 ended := true;
396 f(.);
397 | Close
398 | Pull => ()
399 };
400
401 talkback(. signal);
402 },
403 ),
404 )
405 | End =>
406 if (! ended^) {
407 ended := true;
408 sink(. signal);
409 f(.);
410 }
411 | _ => sink(. signal)
412 }
413 );
414 })
415 );
416
417[@genType]
418let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
419 curry(source =>
420 curry(sink =>
421 source((. signal) => {
422 switch (signal) {
423 | Push(x) => f(. x)
424 | _ => ()
425 };
426
427 sink(. signal);
428 })
429 )
430 );
431
432[@genType]
433let tap = onPush;
434
435[@genType]
436let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
437 curry(source =>
438 curry(sink =>
439 source((. signal) =>
440 switch (signal) {
441 | Start(_) =>
442 sink(. signal);
443 f(.);
444 | _ => sink(. signal)
445 }
446 )
447 )
448 );
449
450type sampleStateT('a) = {
451 mutable ended: bool,
452 mutable value: option('a),
453 mutable sourceTalkback: (. talkbackT) => unit,
454 mutable notifierTalkback: (. talkbackT) => unit,
455};
456
457[@genType]
458let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
459 curry(source =>
460 curry(sink => {
461 let state = {
462 ended: false,
463 value: None,
464 sourceTalkback: (. _: talkbackT) => (),
465 notifierTalkback: (. _: talkbackT) => (),
466 };
467
468 source((. signal) =>
469 switch (signal) {
470 | Start(tb) => state.sourceTalkback = tb
471 | End =>
472 state.ended = true;
473 state.notifierTalkback(. Close);
474 sink(. End);
475 | Push(x) => state.value = Some(x)
476 }
477 );
478
479 notifier((. signal) =>
480 switch (signal, state.value) {
481 | (Start(tb), _) => state.notifierTalkback = tb
482 | (End, _) =>
483 state.ended = true;
484 state.sourceTalkback(. Close);
485 sink(. End);
486 | (Push(_), Some(x)) when !state.ended =>
487 state.value = None;
488 sink(. Push(x));
489 | (Push(_), _) => ()
490 }
491 );
492
493 sink(.
494 Start(
495 (. signal) =>
496 switch (signal) {
497 | Pull =>
498 state.sourceTalkback(. Pull);
499 state.notifierTalkback(. Pull);
500 | Close =>
501 state.ended = true;
502 state.sourceTalkback(. Close);
503 state.notifierTalkback(. Close);
504 },
505 ),
506 );
507 })
508 );
509
510[@genType]
511let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
512 curry(source =>
513 curry(sink => {
514 let acc = ref(seed);
515
516 source((. signal) =>
517 sink(.
518 switch (signal) {
519 | Push(x) =>
520 acc := f(. acc^, x);
521 Push(acc^);
522 | Start(x) => Start(x)
523 | End => End
524 },
525 )
526 );
527 })
528 );
529
530type shareStateT('a) = {
531 mutable sinks: Rebel.Array.t(sinkT('a)),
532 mutable talkback: (. talkbackT) => unit,
533 mutable gotSignal: bool,
534};
535
536[@genType]
537let share = (source: sourceT('a)): sourceT('a) => {
538 let state = {
539 sinks: Rebel.Array.makeEmpty(),
540 talkback: talkbackPlaceholder,
541 gotSignal: false,
542 };
543
544 sink => {
545 state.sinks = Rebel.Array.append(state.sinks, sink);
546
547 if (Rebel.Array.size(state.sinks) === 1) {
548 source((. signal) =>
549 switch (signal) {
550 | Push(_) =>
551 state.gotSignal = false;
552 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
553 | Start(x) => state.talkback = x
554 | End =>
555 Rebel.Array.forEach(state.sinks, sink => sink(. End));
556 state.sinks = Rebel.Array.makeEmpty();
557 }
558 );
559 };
560
561 sink(.
562 Start(
563 (. signal) =>
564 switch (signal) {
565 | Close =>
566 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
567 if (Rebel.Array.size(state.sinks) === 0) {
568 state.talkback(. Close);
569 };
570 | Pull when !state.gotSignal =>
571 state.gotSignal = true;
572 state.talkback(. signal);
573 | Pull => ()
574 },
575 ),
576 );
577 };
578};
579
580[@genType]
581let skip = (wait: int): operatorT('a, 'a) =>
582 curry(source =>
583 curry(sink => {
584 let rest = ref(wait);
585
586 captureTalkback(source, (. signal, talkback) =>
587 switch (signal) {
588 | Push(_) when rest^ > 0 =>
589 rest := rest^ - 1;
590 talkback(. Pull);
591 | _ => sink(. signal)
592 }
593 );
594 })
595 );
596
597type skipUntilStateT = {
598 mutable skip: bool,
599 mutable ended: bool,
600 mutable gotSignal: bool,
601 mutable sourceTalkback: (. talkbackT) => unit,
602 mutable notifierTalkback: (. talkbackT) => unit,
603};
604
605[@genType]
606let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
607 curry(source =>
608 curry(sink => {
609 let state: skipUntilStateT = {
610 skip: true,
611 ended: false,
612 gotSignal: false,
613 sourceTalkback: talkbackPlaceholder,
614 notifierTalkback: talkbackPlaceholder,
615 };
616
617 source((. signal) =>
618 switch (signal) {
619 | Start(tb) =>
620 state.sourceTalkback = tb;
621
622 notifier((. signal) =>
623 switch (signal) {
624 | Start(innerTb) =>
625 state.notifierTalkback = innerTb;
626 innerTb(. Pull);
627 tb(. Pull);
628 | Push(_) =>
629 state.skip = false;
630 state.notifierTalkback(. Close);
631 | End => ()
632 }
633 );
634 | Push(_) when state.skip && !state.ended =>
635 state.sourceTalkback(. Pull)
636 | Push(_) when !state.ended =>
637 state.gotSignal = false;
638 sink(. signal);
639 | Push(_) => ()
640 | End =>
641 if (state.skip) {
642 state.notifierTalkback(. Close);
643 };
644 state.ended = true;
645 sink(. End);
646 }
647 );
648
649 sink(.
650 Start(
651 (. signal) =>
652 switch (signal) {
653 | Close =>
654 if (state.skip) {
655 state.notifierTalkback(. Close);
656 };
657 state.ended = true;
658 state.sourceTalkback(. Close);
659 | Pull when !state.gotSignal && !state.ended =>
660 state.gotSignal = true;
661 state.sourceTalkback(. Pull);
662 | Pull => ()
663 },
664 ),
665 );
666 })
667 );
668
669[@genType]
670let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
671 curry(source =>
672 curry(sink => {
673 let skip = ref(true);
674
675 captureTalkback(source, (. signal, talkback) =>
676 switch (signal) {
677 | Push(x) when skip^ =>
678 if (f(. x)) {
679 talkback(. Pull);
680 } else {
681 skip := false;
682 sink(. signal);
683 }
684 | _ => sink(. signal)
685 }
686 );
687 })
688 );
689
690type switchMapStateT('a) = {
691 mutable outerTalkback: (. talkbackT) => unit,
692 mutable innerTalkback: (. talkbackT) => unit,
693 mutable innerActive: bool,
694 mutable closed: bool,
695 mutable ended: bool,
696};
697
698[@genType]
699let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
700 curry(source =>
701 curry(sink => {
702 let state: switchMapStateT('a) = {
703 outerTalkback: talkbackPlaceholder,
704 innerTalkback: talkbackPlaceholder,
705 innerActive: false,
706 closed: false,
707 ended: false,
708 };
709
710 let applyInnerSource = innerSource =>
711 innerSource((. signal) =>
712 switch (signal) {
713 | End =>
714 state.innerActive = false;
715 state.innerTalkback = talkbackPlaceholder;
716 if (state.ended) {
717 sink(. End);
718 };
719 | Start(tb) =>
720 state.innerActive = true;
721 state.innerTalkback = tb;
722 tb(. Pull);
723 | Push(x) when !state.closed =>
724 sink(. Push(x));
725 state.innerTalkback(. Pull);
726 | Push(_) => ()
727 }
728 );
729
730 source((. signal) =>
731 switch (signal) {
732 | End when !state.ended =>
733 state.ended = true;
734 if (!state.innerActive) {
735 sink(. End);
736 };
737 | End => ()
738 | Start(tb) =>
739 state.outerTalkback = tb;
740 tb(. Pull);
741 | Push(x) when !state.ended =>
742 if (state.innerActive) {
743 state.innerTalkback(. Close);
744 state.innerTalkback = talkbackPlaceholder;
745 };
746 applyInnerSource(f(. x));
747 state.outerTalkback(. Pull);
748 | Push(_) => ()
749 }
750 );
751
752 sink(.
753 Start(
754 (. signal) =>
755 switch (signal) {
756 | Pull => state.innerTalkback(. Pull)
757 | Close =>
758 state.innerTalkback(. Close);
759 if (!state.ended) {
760 state.ended = true;
761 state.closed = true;
762 state.outerTalkback(. Close);
763 state.innerTalkback = talkbackPlaceholder;
764 };
765 },
766 ),
767 );
768 })
769 );
770
771[@genType]
772let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
773 switchMap((. x) => x, source);
774
775type takeStateT = {
776 mutable taken: int,
777 mutable talkback: (. talkbackT) => unit,
778};
779
780[@genType]
781let take = (max: int): operatorT('a, 'a) =>
782 curry(source =>
783 curry(sink => {
784 let state: takeStateT = {taken: 0, talkback: talkbackPlaceholder};
785
786 source((. signal) =>
787 switch (signal) {
788 | Start(tb) => state.talkback = tb
789 | Push(_) when state.taken < max =>
790 state.taken = state.taken + 1;
791 sink(. signal);
792
793 if (state.taken === max) {
794 sink(. End);
795 state.talkback(. Close);
796 };
797 | Push(_) => ()
798 | End when state.taken < max =>
799 state.taken = max;
800 sink(. End);
801 | End => ()
802 }
803 );
804
805 sink(.
806 Start(
807 (. signal) =>
808 if (state.taken < max) {
809 switch (signal) {
810 | Pull => state.talkback(. Pull)
811 | Close =>
812 state.taken = max;
813 state.talkback(. Close);
814 };
815 },
816 ),
817 );
818 })
819 );
820
821[@genType]
822let takeLast = (max: int): operatorT('a, 'a) =>
823 curry(source =>
824 curry(sink => {
825 open Rebel;
826 let queue = MutableQueue.make();
827
828 captureTalkback(source, (. signal, talkback) =>
829 switch (signal) {
830 | Start(_) => talkback(. Pull)
831 | Push(x) =>
832 let size = MutableQueue.size(queue);
833 if (size >= max && max > 0) {
834 ignore(MutableQueue.pop(queue));
835 };
836
837 MutableQueue.add(queue, x);
838 talkback(. Pull);
839 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
840 }
841 );
842 })
843 );
844
845type takeUntilStateT = {
846 mutable ended: bool,
847 mutable sourceTalkback: (. talkbackT) => unit,
848 mutable notifierTalkback: (. talkbackT) => unit,
849};
850
851[@genType]
852let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
853 curry(source =>
854 curry(sink => {
855 let state: takeUntilStateT = {
856 ended: false,
857 sourceTalkback: talkbackPlaceholder,
858 notifierTalkback: talkbackPlaceholder,
859 };
860
861 source((. signal) =>
862 switch (signal) {
863 | Start(tb) =>
864 state.sourceTalkback = tb;
865
866 notifier((. signal) =>
867 switch (signal) {
868 | Start(innerTb) =>
869 state.notifierTalkback = innerTb;
870 innerTb(. Pull);
871 | Push(_) =>
872 state.ended = true;
873 state.sourceTalkback(. Close);
874 sink(. End);
875 | End => ()
876 }
877 );
878 | End when !state.ended =>
879 state.ended = true;
880 state.notifierTalkback(. Close);
881 sink(. End);
882 | End => ()
883 | Push(_) when !state.ended => sink(. signal)
884 | Push(_) => ()
885 }
886 );
887
888 sink(.
889 Start(
890 (. signal) =>
891 if (!state.ended) {
892 switch (signal) {
893 | Close =>
894 state.ended = true;
895 state.sourceTalkback(. Close);
896 state.notifierTalkback(. Close);
897 | Pull => state.sourceTalkback(. Pull)
898 };
899 },
900 ),
901 );
902 })
903 );
904
905[@genType]
906let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
907 curry(source =>
908 curry(sink => {
909 let ended = ref(false);
910 let talkback = ref(talkbackPlaceholder);
911
912 source((. signal) =>
913 switch (signal) {
914 | Start(tb) =>
915 talkback := tb;
916 sink(. signal);
917 | End when ! ended^ =>
918 ended := true;
919 sink(. End);
920 | End => ()
921 | Push(x) when ! ended^ =>
922 if (!f(. x)) {
923 ended := true;
924 sink(. End);
925 talkback^(. Close);
926 } else {
927 sink(. signal);
928 }
929 | Push(_) => ()
930 }
931 );
932
933 sink(.
934 Start(
935 (. signal) =>
936 if (! ended^) {
937 switch (signal) {
938 | Pull => talkback^(. Pull)
939 | Close =>
940 ended := true;
941 talkback^(. Close);
942 };
943 },
944 ),
945 );
946 })
947 );