1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable pulled: bool,
9 mutable ended: bool,
10};
11
12[@genType]
13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
14 curry(source =>
15 curry(sink => {
16 let state = {
17 buffer: Rebel.MutableQueue.make(),
18 sourceTalkback: talkbackPlaceholder,
19 notifierTalkback: talkbackPlaceholder,
20 pulled: false,
21 ended: false,
22 };
23
24 source((. signal) =>
25 switch (signal) {
26 | Start(tb) =>
27 state.sourceTalkback = tb;
28
29 notifier((. signal) =>
30 switch (signal) {
31 | Start(tb) => state.notifierTalkback = tb
32 | Push(_) when !state.ended =>
33 if (Rebel.MutableQueue.size(state.buffer) > 0) {
34 let buffer = state.buffer;
35 state.buffer = Rebel.MutableQueue.make();
36 sink(. Push(Rebel.MutableQueue.toArray(buffer)));
37 }
38 | Push(_) => ()
39 | End when !state.ended =>
40 state.ended = true;
41 state.sourceTalkback(. Close);
42 if (Rebel.MutableQueue.size(state.buffer) > 0) {
43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
44 };
45 sink(. End);
46 | End => ()
47 }
48 );
49 | Push(value) when !state.ended =>
50 Rebel.MutableQueue.add(state.buffer, value);
51 state.pulled = false;
52 state.notifierTalkback(. Pull);
53 | Push(_) => ()
54 | End when !state.ended =>
55 state.ended = true;
56 state.notifierTalkback(. Close);
57 if (Rebel.MutableQueue.size(state.buffer) > 0) {
58 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
59 };
60 sink(. End);
61 | End => ()
62 }
63 );
64
65 sink(.
66 Start(
67 (. signal) =>
68 if (!state.ended) {
69 switch (signal) {
70 | Close =>
71 state.ended = true;
72 state.sourceTalkback(. Close);
73 state.notifierTalkback(. Close);
74 | Pull when !state.pulled =>
75 state.pulled = true;
76 state.sourceTalkback(. Pull);
77 state.notifierTalkback(. Pull);
78 | Pull => ()
79 };
80 },
81 ),
82 );
83 })
84 );
85
86type combineStateT('a, 'b) = {
87 mutable talkbackA: (. talkbackT) => unit,
88 mutable talkbackB: (. talkbackT) => unit,
89 mutable lastValA: option('a),
90 mutable lastValB: option('b),
91 mutable gotSignal: bool,
92 mutable endCounter: int,
93 mutable ended: bool,
94};
95
96[@genType]
97let combine =
98 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
99 curry(sink => {
100 let state = {
101 talkbackA: talkbackPlaceholder,
102 talkbackB: talkbackPlaceholder,
103 lastValA: None,
104 lastValB: None,
105 gotSignal: false,
106 endCounter: 0,
107 ended: false,
108 };
109
110 sourceA((. signal) =>
111 switch (signal, state.lastValB) {
112 | (Start(tb), _) => state.talkbackA = tb
113 | (Push(a), None) =>
114 state.lastValA = Some(a);
115 if (!state.gotSignal) {
116 state.talkbackB(. Pull);
117 } else {
118 state.gotSignal = false;
119 };
120 | (Push(a), Some(b)) when !state.ended =>
121 state.lastValA = Some(a);
122 state.gotSignal = false;
123 sink(. Push((a, b)));
124 | (End, _) when state.endCounter < 1 =>
125 state.endCounter = state.endCounter + 1
126 | (End, _) when !state.ended =>
127 state.ended = true;
128 sink(. End);
129 | _ => ()
130 }
131 );
132
133 sourceB((. signal) =>
134 switch (signal, state.lastValA) {
135 | (Start(tb), _) => state.talkbackB = tb
136 | (Push(b), None) =>
137 state.lastValB = Some(b);
138 if (!state.gotSignal) {
139 state.talkbackA(. Pull);
140 } else {
141 state.gotSignal = false;
142 };
143 | (Push(b), Some(a)) when !state.ended =>
144 state.lastValB = Some(b);
145 state.gotSignal = false;
146 sink(. Push((a, b)));
147 | (End, _) when state.endCounter < 1 =>
148 state.endCounter = state.endCounter + 1
149 | (End, _) when !state.ended =>
150 state.ended = true;
151 sink(. End);
152 | _ => ()
153 }
154 );
155
156 sink(.
157 Start(
158 (. signal) =>
159 if (!state.ended) {
160 switch (signal) {
161 | Close =>
162 state.ended = true;
163 state.talkbackA(. Close);
164 state.talkbackB(. Close);
165 | Pull when !state.gotSignal =>
166 state.gotSignal = true;
167 state.talkbackA(. signal);
168 state.talkbackB(. signal);
169 | Pull => ()
170 };
171 },
172 ),
173 );
174 });
175
176type concatMapStateT('a) = {
177 inputQueue: Rebel.MutableQueue.t('a),
178 mutable outerTalkback: (. talkbackT) => unit,
179 mutable outerPulled: bool,
180 mutable innerTalkback: (. talkbackT) => unit,
181 mutable innerActive: bool,
182 mutable innerPulled: bool,
183 mutable ended: bool,
184};
185
186[@genType]
187let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
188 curry(source =>
189 curry(sink => {
190 let state: concatMapStateT('a) = {
191 inputQueue: Rebel.MutableQueue.make(),
192 outerTalkback: talkbackPlaceholder,
193 outerPulled: false,
194 innerTalkback: talkbackPlaceholder,
195 innerActive: false,
196 innerPulled: false,
197 ended: false,
198 };
199
200 let rec applyInnerSource = innerSource =>
201 innerSource((. signal) =>
202 switch (signal) {
203 | Start(tb) =>
204 state.innerActive = true;
205 state.innerTalkback = tb;
206 state.innerPulled = false;
207 tb(. Pull);
208 | Push(_) when state.innerActive =>
209 sink(. signal);
210 if (!state.innerPulled) {
211 state.innerTalkback(. Pull);
212 } else {
213 state.innerPulled = false;
214 };
215 | Push(_) => ()
216 | End when state.innerActive =>
217 state.innerActive = false;
218 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
219 | Some(input) => applyInnerSource(f(. input))
220 | None when state.ended => sink(. End)
221 | None => ()
222 };
223 | End => ()
224 }
225 );
226
227 source((. signal) =>
228 switch (signal) {
229 | Start(tb) => state.outerTalkback = tb
230 | Push(x) when !state.ended =>
231 if (!state.outerPulled) {
232 state.outerPulled = true;
233 state.outerTalkback(. Pull);
234 } else {
235 state.outerPulled = false;
236 };
237
238 if (state.innerActive) {
239 Rebel.MutableQueue.add(state.inputQueue, x);
240 } else {
241 applyInnerSource(f(. x));
242 };
243 | Push(_) => ()
244 | End when !state.ended =>
245 state.ended = true;
246 if (!state.innerActive
247 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
248 sink(. End);
249 };
250 | End => ()
251 }
252 );
253
254 sink(.
255 Start(
256 (. signal) =>
257 switch (signal) {
258 | Pull =>
259 if (!state.ended && !state.outerPulled) {
260 state.outerPulled = true;
261 state.outerTalkback(. Pull);
262 };
263 if (state.innerActive && !state.innerPulled) {
264 state.innerPulled = true;
265 state.innerTalkback(. Pull);
266 };
267 | Close when !state.ended =>
268 state.ended = true;
269 state.innerActive = false;
270 state.innerTalkback(. Close);
271 state.outerTalkback(. Close);
272 | Close => ()
273 },
274 ),
275 );
276 })
277 );
278
279[@genType]
280let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
281 concatMap((. x) => x, source);
282
283[@genType]
284let concat = (sources: array(sourceT('a))): sourceT('a) =>
285 concatMap((. x) => x, Wonka_sources.fromArray(sources));
286
287[@genType]
288let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
289 curry(source =>
290 curry(sink => {
291 let talkback = ref(talkbackPlaceholder);
292
293 source((. signal) =>
294 switch (signal) {
295 | Start(tb) =>
296 talkback := tb;
297 sink(. signal);
298 | Push(x) when !f(. x) => talkback^(. Pull)
299 | _ => sink(. signal)
300 }
301 );
302 })
303 );
304
305[@genType]
306let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
307 curry(source =>
308 curry(sink =>
309 source((. signal) =>
310 sink(.
311 /* The signal needs to be recreated for genType to generate
312 the correct generics during codegen */
313 switch (signal) {
314 | Start(x) => Start(x)
315 | Push(x) => Push(f(. x))
316 | End => End
317 },
318 )
319 )
320 )
321 );
322
323type mergeMapStateT = {
324 mutable outerTalkback: (. talkbackT) => unit,
325 mutable outerPulled: bool,
326 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
327 mutable ended: bool,
328};
329
330[@genType]
331let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
332 curry(source =>
333 curry(sink => {
334 let state: mergeMapStateT = {
335 outerTalkback: talkbackPlaceholder,
336 outerPulled: false,
337 innerTalkbacks: Rebel.Array.makeEmpty(),
338 ended: false,
339 };
340
341 let applyInnerSource = innerSource => {
342 let talkback = ref(talkbackPlaceholder);
343
344 innerSource((. signal) =>
345 switch (signal) {
346 | Start(tb) =>
347 talkback := tb;
348 state.innerTalkbacks =
349 Rebel.Array.append(state.innerTalkbacks, tb);
350 tb(. Pull);
351 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
352 sink(. Push(x));
353 talkback^(. Pull);
354 | Push(_) => ()
355 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
356 state.innerTalkbacks =
357 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
358 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
359 sink(. End);
360 };
361 | End => ()
362 }
363 );
364 };
365
366 source((. signal) =>
367 switch (signal) {
368 | Start(tb) => state.outerTalkback = tb
369 | Push(x) when !state.ended =>
370 state.outerPulled = false;
371 applyInnerSource(f(. x));
372 | Push(_) => ()
373 | End when !state.ended =>
374 state.ended = true;
375 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
376 sink(. End);
377 };
378 | End => ()
379 }
380 );
381
382 sink(.
383 Start(
384 (. signal) =>
385 if (!state.ended) {
386 switch (signal) {
387 | Close =>
388 let tbs = state.innerTalkbacks;
389 state.ended = true;
390 state.innerTalkbacks = Rebel.Array.makeEmpty();
391 state.outerTalkback(. signal);
392 Rebel.Array.forEach(tbs, tb => tb(. signal));
393 | Pull =>
394 if (!state.outerPulled) {
395 state.outerPulled = true;
396 state.outerTalkback(. Pull);
397 };
398
399 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
400 };
401 },
402 ),
403 );
404 })
405 );
406
407[@genType]
408let merge = (sources: array(sourceT('a))): sourceT('a) =>
409 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
410
411[@genType]
412let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
413 mergeMap((. x) => x, source);
414
415[@genType]
416let flatten = mergeAll;
417
418[@genType]
419let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
420 curry(source =>
421 curry(sink => {
422 source((. signal) =>
423 switch (signal) {
424 | Start(talkback) =>
425 sink(.
426 Start(
427 (. signal) => {
428 switch (signal) {
429 | Close => f(.)
430 | _ => ()
431 };
432 talkback(. signal);
433 },
434 ),
435 )
436 | End =>
437 sink(. signal);
438 f(.);
439 | _ => sink(. signal)
440 }
441 )
442 })
443 );
444
445[@genType]
446let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
447 curry(source =>
448 curry(sink =>
449 source((. signal) => {
450 switch (signal) {
451 | Push(x) => f(. x)
452 | _ => ()
453 };
454
455 sink(. signal);
456 })
457 )
458 );
459
460[@genType]
461let tap = onPush;
462
463[@genType]
464let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
465 curry(source =>
466 curry(sink =>
467 source((. signal) =>
468 switch (signal) {
469 | Start(_) =>
470 sink(. signal);
471 f(.);
472 | _ => sink(. signal)
473 }
474 )
475 )
476 );
477
478type sampleStateT('a) = {
479 mutable sourceTalkback: (. talkbackT) => unit,
480 mutable notifierTalkback: (. talkbackT) => unit,
481 mutable value: option('a),
482 mutable pulled: bool,
483 mutable ended: bool,
484};
485
486[@genType]
487let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
488 curry(source =>
489 curry(sink => {
490 let state = {
491 sourceTalkback: talkbackPlaceholder,
492 notifierTalkback: talkbackPlaceholder,
493 value: None,
494 pulled: false,
495 ended: false,
496 };
497
498 source((. signal) =>
499 switch (signal) {
500 | Start(tb) => state.sourceTalkback = tb
501 | Push(x) =>
502 state.value = Some(x);
503 state.notifierTalkback(. Pull);
504 | End when !state.ended =>
505 state.ended = true;
506 state.notifierTalkback(. Close);
507 sink(. End);
508 | End => ()
509 }
510 );
511
512 notifier((. signal) =>
513 switch (signal, state.value) {
514 | (Start(tb), _) => state.notifierTalkback = tb
515 | (End, _) when !state.ended =>
516 state.ended = true;
517 state.sourceTalkback(. Close);
518 sink(. End);
519 | (End, _) => ()
520 | (Push(_), Some(x)) when !state.ended =>
521 state.value = None;
522 sink(. Push(x));
523 | (Push(_), _) => ()
524 }
525 );
526
527 sink(.
528 Start(
529 (. signal) =>
530 if (!state.ended) {
531 switch (signal) {
532 | Pull when !state.pulled =>
533 state.pulled = true;
534 state.sourceTalkback(. Pull);
535 state.notifierTalkback(. Pull);
536 | Pull => ()
537 | Close =>
538 state.ended = true;
539 state.sourceTalkback(. Close);
540 state.notifierTalkback(. Close);
541 };
542 },
543 ),
544 );
545 })
546 );
547
548[@genType]
549let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
550 curry(source =>
551 curry(sink => {
552 let acc = ref(seed);
553
554 source((. signal) =>
555 sink(.
556 switch (signal) {
557 | Push(x) =>
558 acc := f(. acc^, x);
559 Push(acc^);
560 | Start(x) => Start(x)
561 | End => End
562 },
563 )
564 );
565 })
566 );
567
568type shareStateT('a) = {
569 mutable sinks: Rebel.Array.t(sinkT('a)),
570 mutable talkback: (. talkbackT) => unit,
571 mutable gotSignal: bool,
572};
573
574[@genType]
575let share = (source: sourceT('a)): sourceT('a) => {
576 let state = {
577 sinks: Rebel.Array.makeEmpty(),
578 talkback: talkbackPlaceholder,
579 gotSignal: false,
580 };
581
582 sink => {
583 state.sinks = Rebel.Array.append(state.sinks, sink);
584
585 if (Rebel.Array.size(state.sinks) === 1) {
586 source((. signal) =>
587 switch (signal) {
588 | Push(_) =>
589 state.gotSignal = false;
590 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
591 | Start(x) => state.talkback = x
592 | End =>
593 Rebel.Array.forEach(state.sinks, sink => sink(. End));
594 state.sinks = Rebel.Array.makeEmpty();
595 }
596 );
597 };
598
599 sink(.
600 Start(
601 (. signal) =>
602 switch (signal) {
603 | Close =>
604 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
605 if (Rebel.Array.size(state.sinks) === 0) {
606 state.talkback(. Close);
607 };
608 | Pull when !state.gotSignal =>
609 state.gotSignal = true;
610 state.talkback(. signal);
611 | Pull => ()
612 },
613 ),
614 );
615 };
616};
617
618type skipStateT = {
619 mutable talkback: (. talkbackT) => unit,
620 mutable rest: int,
621};
622
623[@genType]
624let skip = (wait: int): operatorT('a, 'a) =>
625 curry(source =>
626 curry(sink => {
627 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait};
628
629 source((. signal) =>
630 switch (signal) {
631 | Start(tb) =>
632 state.talkback = tb;
633 sink(. signal);
634 | Push(_) when state.rest > 0 =>
635 state.rest = state.rest - 1;
636 state.talkback(. Pull);
637 | _ => sink(. signal)
638 }
639 );
640 })
641 );
642
643type skipUntilStateT = {
644 mutable sourceTalkback: (. talkbackT) => unit,
645 mutable notifierTalkback: (. talkbackT) => unit,
646 mutable skip: bool,
647 mutable pulled: bool,
648 mutable ended: bool,
649};
650
651[@genType]
652let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
653 curry(source =>
654 curry(sink => {
655 let state: skipUntilStateT = {
656 sourceTalkback: talkbackPlaceholder,
657 notifierTalkback: talkbackPlaceholder,
658 skip: true,
659 pulled: false,
660 ended: false,
661 };
662
663 source((. signal) =>
664 switch (signal) {
665 | Start(tb) =>
666 state.sourceTalkback = tb;
667
668 notifier((. signal) =>
669 switch (signal) {
670 | Start(innerTb) =>
671 state.notifierTalkback = innerTb;
672 innerTb(. Pull);
673 | Push(_) =>
674 state.skip = false;
675 state.notifierTalkback(. Close);
676 | End when state.skip =>
677 state.ended = true;
678 state.sourceTalkback(. Close);
679 | End => ()
680 }
681 );
682 | Push(_) when !state.skip && !state.ended =>
683 state.pulled = false;
684 sink(. signal);
685 | Push(_) => ()
686 | End =>
687 if (state.skip) {
688 state.notifierTalkback(. Close);
689 };
690 state.ended = true;
691 sink(. End);
692 }
693 );
694
695 sink(.
696 Start(
697 (. signal) =>
698 if (!state.ended) {
699 switch (signal) {
700 | Close =>
701 state.ended = true;
702 state.sourceTalkback(. Close);
703 if (state.skip) {
704 state.notifierTalkback(. Close);
705 };
706 | Pull when !state.pulled =>
707 state.pulled = true;
708 if (state.skip) {
709 state.notifierTalkback(. Pull);
710 };
711 state.sourceTalkback(. Pull);
712 | Pull => ()
713 };
714 },
715 ),
716 );
717 })
718 );
719
720type skipWhileStateT = {
721 mutable talkback: (. talkbackT) => unit,
722 mutable skip: bool,
723};
724
725[@genType]
726let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
727 curry(source =>
728 curry(sink => {
729 let state: skipWhileStateT = {
730 talkback: talkbackPlaceholder,
731 skip: true,
732 };
733
734 source((. signal) =>
735 switch (signal) {
736 | Start(tb) =>
737 state.talkback = tb;
738 sink(. signal);
739 | Push(x) when state.skip =>
740 if (f(. x)) {
741 state.talkback(. Pull);
742 } else {
743 state.skip = false;
744 sink(. signal);
745 }
746 | _ => sink(. signal)
747 }
748 );
749 })
750 );
751
752type switchMapStateT('a) = {
753 mutable outerTalkback: (. talkbackT) => unit,
754 mutable outerPulled: bool,
755 mutable innerTalkback: (. talkbackT) => unit,
756 mutable innerActive: bool,
757 mutable innerPulled: bool,
758 mutable ended: bool,
759};
760
761[@genType]
762let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
763 curry(source =>
764 curry(sink => {
765 let state: switchMapStateT('a) = {
766 outerTalkback: talkbackPlaceholder,
767 outerPulled: false,
768 innerTalkback: talkbackPlaceholder,
769 innerActive: false,
770 innerPulled: false,
771 ended: false,
772 };
773
774 let applyInnerSource = innerSource =>
775 innerSource((. signal) =>
776 switch (signal) {
777 | Start(tb) =>
778 state.innerActive = true;
779 state.innerTalkback = tb;
780 state.innerPulled = false;
781 tb(. Pull);
782 | Push(_) when state.innerActive =>
783 sink(. signal);
784 if (!state.innerPulled) {
785 state.innerTalkback(. Pull);
786 } else {
787 state.innerPulled = false;
788 };
789 | Push(_) => ()
790 | End when state.innerActive =>
791 state.innerActive = false;
792 if (state.ended) {
793 sink(. signal);
794 };
795 | End => ()
796 }
797 );
798
799 source((. signal) =>
800 switch (signal) {
801 | Start(tb) => state.outerTalkback = tb
802 | Push(x) when !state.ended =>
803 if (state.innerActive) {
804 state.innerTalkback(. Close);
805 state.innerTalkback = talkbackPlaceholder;
806 };
807
808 if (!state.outerPulled) {
809 state.outerPulled = true;
810 state.outerTalkback(. Pull);
811 } else {
812 state.outerPulled = false;
813 };
814
815 applyInnerSource(f(. x));
816 | Push(_) => ()
817 | End when !state.ended =>
818 state.ended = true;
819 if (!state.innerActive) {
820 sink(. End);
821 };
822 | End => ()
823 }
824 );
825
826 sink(.
827 Start(
828 (. signal) =>
829 switch (signal) {
830 | Pull =>
831 if (!state.ended && !state.outerPulled) {
832 state.outerPulled = true;
833 state.outerTalkback(. Pull);
834 };
835 if (state.innerActive && !state.innerPulled) {
836 state.innerPulled = true;
837 state.innerTalkback(. Pull);
838 };
839 | Close when !state.ended =>
840 state.ended = true;
841 state.innerActive = false;
842 state.innerTalkback(. Close);
843 state.outerTalkback(. Close);
844 | Close => ()
845 },
846 ),
847 );
848 })
849 );
850
851[@genType]
852let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
853 switchMap((. x) => x, source);
854
855type takeStateT = {
856 mutable ended: bool,
857 mutable taken: int,
858 mutable talkback: (. talkbackT) => unit,
859};
860
861[@genType]
862let take = (max: int): operatorT('a, 'a) =>
863 curry(source =>
864 curry(sink => {
865 let state: takeStateT = {
866 ended: false,
867 taken: 0,
868 talkback: talkbackPlaceholder,
869 };
870
871 source((. signal) =>
872 switch (signal) {
873 | Start(tb) when max <= 0 =>
874 state.ended = true;
875 sink(. End);
876 tb(. Close);
877 | Start(tb) => state.talkback = tb
878 | Push(_) when state.taken < max && !state.ended =>
879 state.taken = state.taken + 1;
880 sink(. signal);
881 if (!state.ended && state.taken >= max) {
882 state.ended = true;
883 sink(. End);
884 state.talkback(. Close);
885 };
886 | Push(_) => ()
887 | End when !state.ended =>
888 state.ended = true;
889 sink(. End);
890 | End => ()
891 }
892 );
893
894 sink(.
895 Start(
896 (. signal) =>
897 if (!state.ended) {
898 switch (signal) {
899 | Pull when state.taken < max => state.talkback(. Pull)
900 | Pull => ()
901 | Close =>
902 state.ended = true;
903 state.talkback(. Close);
904 };
905 },
906 ),
907 );
908 })
909 );
910
911[@genType]
912let takeLast = (max: int): operatorT('a, 'a) =>
913 curry(source =>
914 curry(sink => {
915 open Rebel;
916 let talkback = ref(talkbackPlaceholder);
917 let queue = MutableQueue.make();
918
919 source((. signal) =>
920 switch (signal) {
921 | Start(tb) when max <= 0 =>
922 tb(. Close);
923 Wonka_sources.empty(sink);
924 | Start(tb) =>
925 talkback := tb;
926 tb(. Pull);
927 | Push(x) =>
928 let size = MutableQueue.size(queue);
929 if (size >= max && max > 0) {
930 ignore(MutableQueue.pop(queue));
931 };
932
933 MutableQueue.add(queue, x);
934 talkback^(. Pull);
935 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
936 }
937 );
938 })
939 );
940
941type takeUntilStateT = {
942 mutable ended: bool,
943 mutable sourceTalkback: (. talkbackT) => unit,
944 mutable notifierTalkback: (. talkbackT) => unit,
945};
946
947[@genType]
948let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
949 curry(source =>
950 curry(sink => {
951 let state: takeUntilStateT = {
952 ended: false,
953 sourceTalkback: talkbackPlaceholder,
954 notifierTalkback: talkbackPlaceholder,
955 };
956
957 source((. signal) =>
958 switch (signal) {
959 | Start(tb) =>
960 state.sourceTalkback = tb;
961
962 notifier((. signal) =>
963 switch (signal) {
964 | Start(innerTb) =>
965 state.notifierTalkback = innerTb;
966 innerTb(. Pull);
967 | Push(_) =>
968 state.ended = true;
969 state.sourceTalkback(. Close);
970 sink(. End);
971 | End => ()
972 }
973 );
974 | End when !state.ended =>
975 state.ended = true;
976 state.notifierTalkback(. Close);
977 sink(. End);
978 | End => ()
979 | Push(_) when !state.ended => sink(. signal)
980 | Push(_) => ()
981 }
982 );
983
984 sink(.
985 Start(
986 (. signal) =>
987 if (!state.ended) {
988 switch (signal) {
989 | Close =>
990 state.ended = true;
991 state.sourceTalkback(. Close);
992 state.notifierTalkback(. Close);
993 | Pull => state.sourceTalkback(. Pull)
994 };
995 },
996 ),
997 );
998 })
999 );
1000
1001type takeWhileStateT = {
1002 mutable talkback: (. talkbackT) => unit,
1003 mutable ended: bool,
1004};
1005
1006[@genType]
1007let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
1008 curry(source =>
1009 curry(sink => {
1010 let state: takeWhileStateT = {
1011 talkback: talkbackPlaceholder,
1012 ended: false,
1013 };
1014
1015 source((. signal) =>
1016 switch (signal) {
1017 | Start(tb) =>
1018 state.talkback = tb;
1019 sink(. signal);
1020 | End when !state.ended =>
1021 state.ended = true;
1022 sink(. End);
1023 | End => ()
1024 | Push(x) when !state.ended =>
1025 if (!f(. x)) {
1026 state.ended = true;
1027 sink(. End);
1028 state.talkback(. Close);
1029 } else {
1030 sink(. signal);
1031 }
1032 | Push(_) => ()
1033 }
1034 );
1035 })
1036 );