1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable pulled: bool,
9 mutable ended: bool,
10};
11
12[@genType]
13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
14 curry(source =>
15 curry(sink => {
16 let state = {
17 buffer: Rebel.MutableQueue.make(),
18 sourceTalkback: talkbackPlaceholder,
19 notifierTalkback: talkbackPlaceholder,
20 pulled: false,
21 ended: false,
22 };
23
24 source((. signal) =>
25 switch (signal) {
26 | Start(tb) =>
27 state.sourceTalkback = tb;
28
29 notifier((. signal) =>
30 switch (signal) {
31 | Start(tb) => state.notifierTalkback = tb
32 | Push(_) when !state.ended =>
33 if (Rebel.MutableQueue.size(state.buffer) > 0) {
34 let buffer = state.buffer;
35 state.buffer = Rebel.MutableQueue.make();
36 sink(. Push(Rebel.MutableQueue.toArray(buffer)));
37 }
38 | Push(_) => ()
39 | End when !state.ended =>
40 state.ended = true;
41 state.sourceTalkback(. Close);
42 if (Rebel.MutableQueue.size(state.buffer) > 0) {
43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
44 };
45 sink(. End);
46 | End => ()
47 }
48 );
49 | Push(value) when !state.ended =>
50 Rebel.MutableQueue.add(state.buffer, value);
51 if (!state.pulled) {
52 state.pulled = true;
53 state.sourceTalkback(. Pull);
54 state.notifierTalkback(. Pull);
55 } else {
56 state.pulled = false;
57 };
58 | Push(_) => ()
59 | End when !state.ended =>
60 state.ended = true;
61 state.notifierTalkback(. Close);
62 if (Rebel.MutableQueue.size(state.buffer) > 0) {
63 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
64 };
65 sink(. End);
66 | End => ()
67 }
68 );
69
70 sink(.
71 Start(
72 (. signal) =>
73 if (!state.ended) {
74 switch (signal) {
75 | Close =>
76 state.ended = true;
77 state.sourceTalkback(. Close);
78 state.notifierTalkback(. Close);
79 | Pull when !state.pulled =>
80 state.pulled = true;
81 state.sourceTalkback(. Pull);
82 state.notifierTalkback(. Pull);
83 | Pull => ()
84 };
85 },
86 ),
87 );
88 })
89 );
90
91type combineStateT('a, 'b) = {
92 mutable talkbackA: (. talkbackT) => unit,
93 mutable talkbackB: (. talkbackT) => unit,
94 mutable lastValA: option('a),
95 mutable lastValB: option('b),
96 mutable gotSignal: bool,
97 mutable endCounter: int,
98 mutable ended: bool,
99};
100
101[@genType]
102let combine =
103 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
104 curry(sink => {
105 let state = {
106 talkbackA: talkbackPlaceholder,
107 talkbackB: talkbackPlaceholder,
108 lastValA: None,
109 lastValB: None,
110 gotSignal: false,
111 endCounter: 0,
112 ended: false,
113 };
114
115 sourceA((. signal) =>
116 switch (signal, state.lastValB) {
117 | (Start(tb), _) => state.talkbackA = tb
118 | (Push(a), None) =>
119 state.lastValA = Some(a);
120 if (!state.gotSignal) {
121 state.talkbackB(. Pull);
122 } else {
123 state.gotSignal = false;
124 };
125 | (Push(a), Some(b)) when !state.ended =>
126 state.lastValA = Some(a);
127 state.gotSignal = false;
128 sink(. Push((a, b)));
129 | (End, _) when state.endCounter < 1 =>
130 state.endCounter = state.endCounter + 1
131 | (End, _) when !state.ended =>
132 state.ended = true;
133 sink(. End);
134 | _ => ()
135 }
136 );
137
138 sourceB((. signal) =>
139 switch (signal, state.lastValA) {
140 | (Start(tb), _) => state.talkbackB = tb
141 | (Push(b), None) =>
142 state.lastValB = Some(b);
143 if (!state.gotSignal) {
144 state.talkbackA(. Pull);
145 } else {
146 state.gotSignal = false;
147 };
148 | (Push(b), Some(a)) when !state.ended =>
149 state.lastValB = Some(b);
150 state.gotSignal = false;
151 sink(. Push((a, b)));
152 | (End, _) when state.endCounter < 1 =>
153 state.endCounter = state.endCounter + 1
154 | (End, _) when !state.ended =>
155 state.ended = true;
156 sink(. End);
157 | _ => ()
158 }
159 );
160
161 sink(.
162 Start(
163 (. signal) =>
164 if (!state.ended) {
165 switch (signal) {
166 | Close =>
167 state.ended = true;
168 state.talkbackA(. Close);
169 state.talkbackB(. Close);
170 | Pull when !state.gotSignal =>
171 state.gotSignal = true;
172 state.talkbackA(. signal);
173 state.talkbackB(. signal);
174 | Pull => ()
175 };
176 },
177 ),
178 );
179 });
180
181type concatMapStateT('a) = {
182 inputQueue: Rebel.MutableQueue.t('a),
183 mutable outerTalkback: (. talkbackT) => unit,
184 mutable outerPulled: bool,
185 mutable innerTalkback: (. talkbackT) => unit,
186 mutable innerActive: bool,
187 mutable innerPulled: bool,
188 mutable ended: bool,
189};
190
191[@genType]
192let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
193 curry(source =>
194 curry(sink => {
195 let state: concatMapStateT('a) = {
196 inputQueue: Rebel.MutableQueue.make(),
197 outerTalkback: talkbackPlaceholder,
198 outerPulled: false,
199 innerTalkback: talkbackPlaceholder,
200 innerActive: false,
201 innerPulled: false,
202 ended: false,
203 };
204
205 let rec applyInnerSource = innerSource =>
206 innerSource((. signal) =>
207 switch (signal) {
208 | Start(tb) =>
209 state.innerActive = true;
210 state.innerTalkback = tb;
211 state.innerPulled = false;
212 tb(. Pull);
213 | Push(_) when state.innerActive =>
214 sink(. signal);
215 if (!state.innerPulled) {
216 state.innerTalkback(. Pull);
217 } else {
218 state.innerPulled = false;
219 };
220 | Push(_) => ()
221 | End when state.innerActive =>
222 state.innerActive = false;
223 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
224 | Some(input) => applyInnerSource(f(. input))
225 | None when state.ended => sink(. End)
226 | None when !state.outerPulled =>
227 state.outerPulled = true;
228 state.outerTalkback(. Pull);
229 | None => ()
230 };
231 | End => ()
232 }
233 );
234
235 source((. signal) =>
236 switch (signal) {
237 | Start(tb) => state.outerTalkback = tb
238 | Push(x) when !state.ended =>
239 state.outerPulled = false;
240 if (state.innerActive) {
241 Rebel.MutableQueue.add(state.inputQueue, x);
242 } else {
243 applyInnerSource(f(. x));
244 };
245 | Push(_) => ()
246 | End when !state.ended =>
247 state.ended = true;
248 if (!state.innerActive
249 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
250 sink(. End);
251 };
252 | End => ()
253 }
254 );
255
256 sink(.
257 Start(
258 (. signal) =>
259 switch (signal) {
260 | Pull =>
261 if (!state.ended && !state.outerPulled) {
262 state.outerPulled = true;
263 state.outerTalkback(. Pull);
264 };
265 if (state.innerActive && !state.innerPulled) {
266 state.innerPulled = true;
267 state.innerTalkback(. Pull);
268 };
269 | Close =>
270 if (!state.ended) {
271 state.ended = true;
272 state.outerTalkback(. Close);
273 };
274 if (state.innerActive) {
275 state.innerActive = false;
276 state.innerTalkback(. Close);
277 };
278 },
279 ),
280 );
281 })
282 );
283
284[@genType]
285let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
286 concatMap((. x) => x, source);
287
288[@genType]
289let concat = (sources: array(sourceT('a))): sourceT('a) =>
290 concatMap((. x) => x, Wonka_sources.fromArray(sources));
291
292[@genType]
293let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
294 curry(source =>
295 curry(sink => {
296 let talkback = ref(talkbackPlaceholder);
297
298 source((. signal) =>
299 switch (signal) {
300 | Start(tb) =>
301 talkback := tb;
302 sink(. signal);
303 | Push(x) when !f(. x) => talkback^(. Pull)
304 | _ => sink(. signal)
305 }
306 );
307 })
308 );
309
310[@genType]
311let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
312 curry(source =>
313 curry(sink =>
314 source((. signal) =>
315 sink(.
316 /* The signal needs to be recreated for genType to generate
317 the correct generics during codegen */
318 switch (signal) {
319 | Start(x) => Start(x)
320 | Push(x) => Push(f(. x))
321 | End => End
322 },
323 )
324 )
325 )
326 );
327
328type mergeMapStateT = {
329 mutable outerTalkback: (. talkbackT) => unit,
330 mutable outerPulled: bool,
331 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
332 mutable ended: bool,
333};
334
335[@genType]
336let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
337 curry(source =>
338 curry(sink => {
339 let state: mergeMapStateT = {
340 outerTalkback: talkbackPlaceholder,
341 outerPulled: false,
342 innerTalkbacks: Rebel.Array.makeEmpty(),
343 ended: false,
344 };
345
346 let applyInnerSource = innerSource => {
347 let talkback = ref(talkbackPlaceholder);
348
349 innerSource((. signal) =>
350 switch (signal) {
351 | Start(tb) =>
352 talkback := tb;
353 state.innerTalkbacks =
354 Rebel.Array.append(state.innerTalkbacks, tb);
355 tb(. Pull);
356 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
357 sink(. Push(x));
358 talkback^(. Pull);
359 | Push(_) => ()
360 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
361 state.innerTalkbacks =
362 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
363 let exhausted = Rebel.Array.size(state.innerTalkbacks) === 0;
364 if (state.ended && exhausted) {
365 sink(. End);
366 } else if (!state.outerPulled && exhausted) {
367 state.outerPulled = true;
368 state.outerTalkback(. Pull);
369 };
370 | End => ()
371 }
372 );
373 };
374
375 source((. signal) =>
376 switch (signal) {
377 | Start(tb) => state.outerTalkback = tb
378 | Push(x) when !state.ended =>
379 state.outerPulled = false;
380 applyInnerSource(f(. x));
381 if (!state.outerPulled) {
382 state.outerPulled = true;
383 state.outerTalkback(. Pull);
384 };
385 | Push(_) => ()
386 | End when !state.ended =>
387 state.ended = true;
388 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
389 sink(. End);
390 };
391 | End => ()
392 }
393 );
394
395 sink(.
396 Start(
397 (. signal) =>
398 switch (signal) {
399 | Close =>
400 if (!state.ended) {
401 state.ended = true;
402 state.outerTalkback(. signal);
403 };
404
405 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. signal));
406 state.innerTalkbacks = Rebel.Array.makeEmpty();
407 | Pull =>
408 if (!state.outerPulled && !state.ended) {
409 state.outerPulled = true;
410 state.outerTalkback(. Pull);
411 } else {
412 state.outerPulled = false;
413 };
414
415 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
416 },
417 ),
418 );
419 })
420 );
421
422[@genType]
423let merge = (sources: array(sourceT('a))): sourceT('a) =>
424 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
425
426[@genType]
427let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
428 mergeMap((. x) => x, source);
429
430[@genType]
431let flatten = mergeAll;
432
433[@genType]
434let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
435 curry(source =>
436 curry(sink => {
437 source((. signal) =>
438 switch (signal) {
439 | Start(talkback) =>
440 sink(.
441 Start(
442 (. signal) => {
443 switch (signal) {
444 | Close => f(.)
445 | _ => ()
446 };
447 talkback(. signal);
448 },
449 ),
450 )
451 | End =>
452 sink(. signal);
453 f(.);
454 | _ => sink(. signal)
455 }
456 )
457 })
458 );
459
460[@genType]
461let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
462 curry(source =>
463 curry(sink =>
464 source((. signal) => {
465 switch (signal) {
466 | Push(x) => f(. x)
467 | _ => ()
468 };
469
470 sink(. signal);
471 })
472 )
473 );
474
475[@genType]
476let tap = onPush;
477
478[@genType]
479let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
480 curry(source =>
481 curry(sink =>
482 source((. signal) =>
483 switch (signal) {
484 | Start(_) =>
485 sink(. signal);
486 f(.);
487 | _ => sink(. signal)
488 }
489 )
490 )
491 );
492
493type sampleStateT('a) = {
494 mutable sourceTalkback: (. talkbackT) => unit,
495 mutable notifierTalkback: (. talkbackT) => unit,
496 mutable value: option('a),
497 mutable pulled: bool,
498 mutable ended: bool,
499};
500
501[@genType]
502let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
503 curry(source =>
504 curry(sink => {
505 let state = {
506 sourceTalkback: talkbackPlaceholder,
507 notifierTalkback: talkbackPlaceholder,
508 value: None,
509 pulled: false,
510 ended: false,
511 };
512
513 source((. signal) =>
514 switch (signal) {
515 | Start(tb) => state.sourceTalkback = tb
516 | Push(x) =>
517 state.value = Some(x);
518 if (!state.pulled) {
519 state.pulled = true;
520 state.notifierTalkback(. Pull);
521 state.sourceTalkback(. Pull);
522 } else {
523 state.pulled = false;
524 };
525 | End when !state.ended =>
526 state.ended = true;
527 state.notifierTalkback(. Close);
528 sink(. End);
529 | End => ()
530 }
531 );
532
533 notifier((. signal) =>
534 switch (signal, state.value) {
535 | (Start(tb), _) => state.notifierTalkback = tb
536 | (End, _) when !state.ended =>
537 state.ended = true;
538 state.sourceTalkback(. Close);
539 sink(. End);
540 | (End, _) => ()
541 | (Push(_), Some(x)) when !state.ended =>
542 state.value = None;
543 sink(. Push(x));
544 | (Push(_), _) => ()
545 }
546 );
547
548 sink(.
549 Start(
550 (. signal) =>
551 if (!state.ended) {
552 switch (signal) {
553 | Pull when !state.pulled =>
554 state.pulled = true;
555 state.sourceTalkback(. Pull);
556 state.notifierTalkback(. Pull);
557 | Pull => ()
558 | Close =>
559 state.ended = true;
560 state.sourceTalkback(. Close);
561 state.notifierTalkback(. Close);
562 };
563 },
564 ),
565 );
566 })
567 );
568
569[@genType]
570let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
571 curry(source =>
572 curry(sink => {
573 let acc = ref(seed);
574
575 source((. signal) =>
576 sink(.
577 switch (signal) {
578 | Push(x) =>
579 acc := f(. acc^, x);
580 Push(acc^);
581 | Start(x) => Start(x)
582 | End => End
583 },
584 )
585 );
586 })
587 );
588
589type shareStateT('a) = {
590 mutable sinks: Rebel.Array.t(sinkT('a)),
591 mutable talkback: (. talkbackT) => unit,
592 mutable gotSignal: bool,
593};
594
595[@genType]
596let share = (source: sourceT('a)): sourceT('a) => {
597 let state = {
598 sinks: Rebel.Array.makeEmpty(),
599 talkback: talkbackPlaceholder,
600 gotSignal: false,
601 };
602
603 sink => {
604 state.sinks = Rebel.Array.append(state.sinks, sink);
605
606 if (Rebel.Array.size(state.sinks) === 1) {
607 source((. signal) =>
608 switch (signal) {
609 | Push(_) =>
610 state.gotSignal = false;
611 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
612 | Start(x) => state.talkback = x
613 | End =>
614 Rebel.Array.forEach(state.sinks, sink => sink(. End));
615 state.sinks = Rebel.Array.makeEmpty();
616 }
617 );
618 };
619
620 sink(.
621 Start(
622 (. signal) =>
623 switch (signal) {
624 | Close =>
625 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
626 if (Rebel.Array.size(state.sinks) === 0) {
627 state.talkback(. Close);
628 };
629 | Pull when !state.gotSignal =>
630 state.gotSignal = true;
631 state.talkback(. signal);
632 | Pull => ()
633 },
634 ),
635 );
636 };
637};
638
639type skipStateT = {
640 mutable talkback: (. talkbackT) => unit,
641 mutable rest: int,
642};
643
644[@genType]
645let skip = (wait: int): operatorT('a, 'a) =>
646 curry(source =>
647 curry(sink => {
648 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait};
649
650 source((. signal) =>
651 switch (signal) {
652 | Start(tb) =>
653 state.talkback = tb;
654 sink(. signal);
655 | Push(_) when state.rest > 0 =>
656 state.rest = state.rest - 1;
657 state.talkback(. Pull);
658 | _ => sink(. signal)
659 }
660 );
661 })
662 );
663
664type skipUntilStateT = {
665 mutable sourceTalkback: (. talkbackT) => unit,
666 mutable notifierTalkback: (. talkbackT) => unit,
667 mutable skip: bool,
668 mutable pulled: bool,
669 mutable ended: bool,
670};
671
672[@genType]
673let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
674 curry(source =>
675 curry(sink => {
676 let state: skipUntilStateT = {
677 sourceTalkback: talkbackPlaceholder,
678 notifierTalkback: talkbackPlaceholder,
679 skip: true,
680 pulled: false,
681 ended: false,
682 };
683
684 source((. signal) =>
685 switch (signal) {
686 | Start(tb) =>
687 state.sourceTalkback = tb;
688
689 notifier((. signal) =>
690 switch (signal) {
691 | Start(innerTb) =>
692 state.notifierTalkback = innerTb;
693 innerTb(. Pull);
694 | Push(_) =>
695 state.skip = false;
696 state.notifierTalkback(. Close);
697 | End when state.skip =>
698 state.ended = true;
699 state.sourceTalkback(. Close);
700 | End => ()
701 }
702 );
703 | Push(_) when !state.skip && !state.ended =>
704 state.pulled = false;
705 sink(. signal);
706 | Push(_) when !state.pulled =>
707 state.pulled = true;
708 state.sourceTalkback(. Pull);
709 state.notifierTalkback(. Pull);
710 | Push(_) => state.pulled = false
711 | End =>
712 if (state.skip) {
713 state.notifierTalkback(. Close);
714 };
715 state.ended = true;
716 sink(. End);
717 }
718 );
719
720 sink(.
721 Start(
722 (. signal) =>
723 if (!state.ended) {
724 switch (signal) {
725 | Close =>
726 state.ended = true;
727 state.sourceTalkback(. Close);
728 if (state.skip) {
729 state.notifierTalkback(. Close);
730 };
731 | Pull when !state.pulled =>
732 state.pulled = true;
733 if (state.skip) {
734 state.notifierTalkback(. Pull);
735 };
736 state.sourceTalkback(. Pull);
737 | Pull => ()
738 };
739 },
740 ),
741 );
742 })
743 );
744
745type skipWhileStateT = {
746 mutable talkback: (. talkbackT) => unit,
747 mutable skip: bool,
748};
749
750[@genType]
751let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
752 curry(source =>
753 curry(sink => {
754 let state: skipWhileStateT = {
755 talkback: talkbackPlaceholder,
756 skip: true,
757 };
758
759 source((. signal) =>
760 switch (signal) {
761 | Start(tb) =>
762 state.talkback = tb;
763 sink(. signal);
764 | Push(x) when state.skip =>
765 if (f(. x)) {
766 state.talkback(. Pull);
767 } else {
768 state.skip = false;
769 sink(. signal);
770 }
771 | _ => sink(. signal)
772 }
773 );
774 })
775 );
776
777type switchMapStateT('a) = {
778 mutable outerTalkback: (. talkbackT) => unit,
779 mutable outerPulled: bool,
780 mutable innerTalkback: (. talkbackT) => unit,
781 mutable innerActive: bool,
782 mutable innerPulled: bool,
783 mutable ended: bool,
784};
785
786[@genType]
787let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
788 curry(source =>
789 curry(sink => {
790 let state: switchMapStateT('a) = {
791 outerTalkback: talkbackPlaceholder,
792 outerPulled: false,
793 innerTalkback: talkbackPlaceholder,
794 innerActive: false,
795 innerPulled: false,
796 ended: false,
797 };
798
799 let applyInnerSource = innerSource =>
800 innerSource((. signal) =>
801 switch (signal) {
802 | Start(tb) =>
803 state.innerActive = true;
804 state.innerTalkback = tb;
805 state.innerPulled = false;
806 tb(. Pull);
807 | Push(_) when state.innerActive =>
808 sink(. signal);
809 if (!state.innerPulled) {
810 state.innerTalkback(. Pull);
811 } else {
812 state.innerPulled = false;
813 };
814 | Push(_) => ()
815 | End when state.innerActive =>
816 state.innerActive = false;
817 if (state.ended) {
818 sink(. signal);
819 } else if (!state.outerPulled) {
820 state.outerPulled = true;
821 state.outerTalkback(. Pull);
822 };
823 | End => ()
824 }
825 );
826
827 source((. signal) =>
828 switch (signal) {
829 | Start(tb) => state.outerTalkback = tb
830 | Push(x) when !state.ended =>
831 if (state.innerActive) {
832 state.innerTalkback(. Close);
833 state.innerTalkback = talkbackPlaceholder;
834 };
835
836 if (!state.outerPulled) {
837 state.outerPulled = true;
838 state.outerTalkback(. Pull);
839 } else {
840 state.outerPulled = false;
841 };
842
843 applyInnerSource(f(. x));
844 | Push(_) => ()
845 | End when !state.ended =>
846 state.ended = true;
847 if (!state.innerActive) {
848 sink(. End);
849 };
850 | End => ()
851 }
852 );
853
854 sink(.
855 Start(
856 (. signal) =>
857 switch (signal) {
858 | Pull =>
859 if (!state.ended && !state.outerPulled) {
860 state.outerPulled = true;
861 state.outerTalkback(. Pull);
862 };
863 if (state.innerActive && !state.innerPulled) {
864 state.innerPulled = true;
865 state.innerTalkback(. Pull);
866 };
867 | Close =>
868 if (!state.ended) {
869 state.ended = true;
870 state.outerTalkback(. Close);
871 };
872 if (state.innerActive) {
873 state.innerActive = false;
874 state.innerTalkback(. Close);
875 };
876 },
877 ),
878 );
879 })
880 );
881
882[@genType]
883let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
884 switchMap((. x) => x, source);
885
886type takeStateT = {
887 mutable ended: bool,
888 mutable taken: int,
889 mutable talkback: (. talkbackT) => unit,
890};
891
892[@genType]
893let take = (max: int): operatorT('a, 'a) =>
894 curry(source =>
895 curry(sink => {
896 let state: takeStateT = {
897 ended: false,
898 taken: 0,
899 talkback: talkbackPlaceholder,
900 };
901
902 source((. signal) =>
903 switch (signal) {
904 | Start(tb) when max <= 0 =>
905 state.ended = true;
906 sink(. End);
907 tb(. Close);
908 | Start(tb) => state.talkback = tb
909 | Push(_) when state.taken < max && !state.ended =>
910 state.taken = state.taken + 1;
911 sink(. signal);
912 if (!state.ended && state.taken >= max) {
913 state.ended = true;
914 sink(. End);
915 state.talkback(. Close);
916 };
917 | Push(_) => ()
918 | End when !state.ended =>
919 state.ended = true;
920 sink(. End);
921 | End => ()
922 }
923 );
924
925 sink(.
926 Start(
927 (. signal) =>
928 if (!state.ended) {
929 switch (signal) {
930 | Pull when state.taken < max => state.talkback(. Pull)
931 | Pull => ()
932 | Close =>
933 state.ended = true;
934 state.talkback(. Close);
935 };
936 },
937 ),
938 );
939 })
940 );
941
942[@genType]
943let takeLast = (max: int): operatorT('a, 'a) =>
944 curry(source =>
945 curry(sink => {
946 open Rebel;
947 let talkback = ref(talkbackPlaceholder);
948 let queue = MutableQueue.make();
949
950 source((. signal) =>
951 switch (signal) {
952 | Start(tb) when max <= 0 =>
953 tb(. Close);
954 Wonka_sources.empty(sink);
955 | Start(tb) =>
956 talkback := tb;
957 tb(. Pull);
958 | Push(x) =>
959 let size = MutableQueue.size(queue);
960 if (size >= max && max > 0) {
961 ignore(MutableQueue.pop(queue));
962 };
963
964 MutableQueue.add(queue, x);
965 talkback^(. Pull);
966 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
967 }
968 );
969 })
970 );
971
972type takeUntilStateT = {
973 mutable ended: bool,
974 mutable sourceTalkback: (. talkbackT) => unit,
975 mutable notifierTalkback: (. talkbackT) => unit,
976};
977
978[@genType]
979let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
980 curry(source =>
981 curry(sink => {
982 let state: takeUntilStateT = {
983 ended: false,
984 sourceTalkback: talkbackPlaceholder,
985 notifierTalkback: talkbackPlaceholder,
986 };
987
988 source((. signal) =>
989 switch (signal) {
990 | Start(tb) =>
991 state.sourceTalkback = tb;
992
993 notifier((. signal) =>
994 switch (signal) {
995 | Start(innerTb) =>
996 state.notifierTalkback = innerTb;
997 innerTb(. Pull);
998 | Push(_) =>
999 state.ended = true;
1000 state.sourceTalkback(. Close);
1001 sink(. End);
1002 | End => ()
1003 }
1004 );
1005 | End when !state.ended =>
1006 state.ended = true;
1007 state.notifierTalkback(. Close);
1008 sink(. End);
1009 | End => ()
1010 | Push(_) when !state.ended => sink(. signal)
1011 | Push(_) => ()
1012 }
1013 );
1014
1015 sink(.
1016 Start(
1017 (. signal) =>
1018 if (!state.ended) {
1019 switch (signal) {
1020 | Close =>
1021 state.ended = true;
1022 state.sourceTalkback(. Close);
1023 state.notifierTalkback(. Close);
1024 | Pull => state.sourceTalkback(. Pull)
1025 };
1026 },
1027 ),
1028 );
1029 })
1030 );
1031
1032type takeWhileStateT = {
1033 mutable talkback: (. talkbackT) => unit,
1034 mutable ended: bool,
1035};
1036
1037[@genType]
1038let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
1039 curry(source =>
1040 curry(sink => {
1041 let state: takeWhileStateT = {
1042 talkback: talkbackPlaceholder,
1043 ended: false,
1044 };
1045
1046 source((. signal) =>
1047 switch (signal) {
1048 | Start(tb) =>
1049 state.talkback = tb;
1050 sink(. signal);
1051 | End when !state.ended =>
1052 state.ended = true;
1053 sink(. End);
1054 | End => ()
1055 | Push(x) when !state.ended =>
1056 if (!f(. x)) {
1057 state.ended = true;
1058 sink(. End);
1059 state.talkback(. Close);
1060 } else {
1061 sink(. signal);
1062 }
1063 | Push(_) => ()
1064 }
1065 );
1066 })
1067 );