1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable pulled: bool,
9 mutable ended: bool,
10};
11
12[@genType]
13let buffer = (notifier: sourceT('a)): operatorT('b, array('b)) =>
14 curry(source =>
15 curry(sink => {
16 let state = {
17 buffer: Rebel.MutableQueue.make(),
18 sourceTalkback: talkbackPlaceholder,
19 notifierTalkback: talkbackPlaceholder,
20 pulled: false,
21 ended: false,
22 };
23
24 source((. signal) =>
25 switch (signal) {
26 | Start(tb) =>
27 state.sourceTalkback = tb;
28
29 notifier((. signal) =>
30 switch (signal) {
31 | Start(tb) => state.notifierTalkback = tb
32 | Push(_) when !state.ended =>
33 if (Rebel.MutableQueue.size(state.buffer) > 0) {
34 let buffer = state.buffer;
35 state.buffer = Rebel.MutableQueue.make();
36 sink(. Push(Rebel.MutableQueue.toArray(buffer)));
37 }
38 | Push(_) => ()
39 | End when !state.ended =>
40 state.ended = true;
41 state.sourceTalkback(. Close);
42 if (Rebel.MutableQueue.size(state.buffer) > 0) {
43 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
44 };
45 sink(. End);
46 | End => ()
47 }
48 );
49 | Push(value) when !state.ended =>
50 Rebel.MutableQueue.add(state.buffer, value);
51 state.pulled = false;
52 state.notifierTalkback(. Pull);
53 | Push(_) => ()
54 | End when !state.ended =>
55 state.ended = true;
56 state.notifierTalkback(. Close);
57 if (Rebel.MutableQueue.size(state.buffer) > 0) {
58 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
59 };
60 sink(. End);
61 | End => ()
62 }
63 );
64
65 sink(.
66 Start(
67 (. signal) =>
68 if (!state.ended) {
69 switch (signal) {
70 | Close =>
71 state.ended = true;
72 state.sourceTalkback(. Close);
73 state.notifierTalkback(. Close);
74 | Pull when !state.pulled =>
75 state.pulled = true;
76 state.sourceTalkback(. Pull);
77 state.notifierTalkback(. Pull);
78 | Pull => ()
79 };
80 },
81 ),
82 );
83 })
84 );
85
86type combineStateT('a, 'b) = {
87 mutable talkbackA: (. talkbackT) => unit,
88 mutable talkbackB: (. talkbackT) => unit,
89 mutable lastValA: option('a),
90 mutable lastValB: option('b),
91 mutable gotSignal: bool,
92 mutable endCounter: int,
93 mutable ended: bool,
94};
95
96[@genType]
97let combine =
98 (sourceA: sourceT('a), sourceB: sourceT('b)): sourceT(('a, 'b)) =>
99 curry(sink => {
100 let state = {
101 talkbackA: talkbackPlaceholder,
102 talkbackB: talkbackPlaceholder,
103 lastValA: None,
104 lastValB: None,
105 gotSignal: false,
106 endCounter: 0,
107 ended: false,
108 };
109
110 sourceA((. signal) =>
111 switch (signal, state.lastValB) {
112 | (Start(tb), _) => state.talkbackA = tb
113 | (Push(a), None) =>
114 state.lastValA = Some(a);
115 if (!state.gotSignal) {
116 state.talkbackB(. Pull);
117 } else {
118 state.gotSignal = false;
119 };
120 | (Push(a), Some(b)) when !state.ended =>
121 state.lastValA = Some(a);
122 state.gotSignal = false;
123 sink(. Push((a, b)));
124 | (End, _) when state.endCounter < 1 =>
125 state.endCounter = state.endCounter + 1
126 | (End, _) when !state.ended =>
127 state.ended = true;
128 sink(. End);
129 | _ => ()
130 }
131 );
132
133 sourceB((. signal) =>
134 switch (signal, state.lastValA) {
135 | (Start(tb), _) => state.talkbackB = tb
136 | (Push(b), None) =>
137 state.lastValB = Some(b);
138 if (!state.gotSignal) {
139 state.talkbackA(. Pull);
140 } else {
141 state.gotSignal = false;
142 };
143 | (Push(b), Some(a)) when !state.ended =>
144 state.lastValB = Some(b);
145 state.gotSignal = false;
146 sink(. Push((a, b)));
147 | (End, _) when state.endCounter < 1 =>
148 state.endCounter = state.endCounter + 1
149 | (End, _) when !state.ended =>
150 state.ended = true;
151 sink(. End);
152 | _ => ()
153 }
154 );
155
156 sink(.
157 Start(
158 (. signal) =>
159 if (!state.ended) {
160 switch (signal) {
161 | Close =>
162 state.ended = true;
163 state.talkbackA(. Close);
164 state.talkbackB(. Close);
165 | Pull when !state.gotSignal =>
166 state.gotSignal = true;
167 state.talkbackA(. signal);
168 state.talkbackB(. signal);
169 | Pull => ()
170 };
171 },
172 ),
173 );
174 });
175
176type concatMapStateT('a) = {
177 inputQueue: Rebel.MutableQueue.t('a),
178 mutable outerTalkback: (. talkbackT) => unit,
179 mutable outerPulled: bool,
180 mutable innerTalkback: (. talkbackT) => unit,
181 mutable innerActive: bool,
182 mutable innerPulled: bool,
183 mutable ended: bool,
184};
185
186[@genType]
187let concatMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
188 curry(source =>
189 curry(sink => {
190 let state: concatMapStateT('a) = {
191 inputQueue: Rebel.MutableQueue.make(),
192 outerTalkback: talkbackPlaceholder,
193 outerPulled: false,
194 innerTalkback: talkbackPlaceholder,
195 innerActive: false,
196 innerPulled: false,
197 ended: false,
198 };
199
200 let rec applyInnerSource = innerSource =>
201 innerSource((. signal) =>
202 switch (signal) {
203 | Start(tb) =>
204 state.innerActive = true;
205 state.innerTalkback = tb;
206 state.innerPulled = false;
207 tb(. Pull);
208 | Push(_) when state.innerActive =>
209 sink(. signal);
210 if (!state.innerPulled) {
211 state.innerTalkback(. Pull);
212 } else {
213 state.innerPulled = false;
214 };
215 | Push(_) => ()
216 | End when state.innerActive =>
217 state.innerActive = false;
218 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
219 | Some(input) => applyInnerSource(f(. input))
220 | None when state.ended => sink(. End)
221 | None => ()
222 };
223 | End => ()
224 }
225 );
226
227 source((. signal) =>
228 switch (signal) {
229 | Start(tb) => state.outerTalkback = tb
230 | Push(x) when !state.ended =>
231 if (!state.outerPulled) {
232 state.outerPulled = true;
233 state.outerTalkback(. Pull);
234 } else {
235 state.outerPulled = false;
236 };
237
238 if (state.innerActive) {
239 Rebel.MutableQueue.add(state.inputQueue, x);
240 } else {
241 applyInnerSource(f(. x));
242 };
243 | Push(_) => ()
244 | End when !state.ended =>
245 state.ended = true;
246 if (!state.innerActive
247 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
248 sink(. End);
249 };
250 | End => ()
251 }
252 );
253
254 sink(.
255 Start(
256 (. signal) =>
257 switch (signal) {
258 | Pull =>
259 if (!state.ended && !state.outerPulled) {
260 state.outerPulled = true;
261 state.outerTalkback(. Pull);
262 };
263 if (state.innerActive && !state.innerPulled) {
264 state.innerPulled = true;
265 state.innerTalkback(. Pull);
266 };
267 | Close when !state.ended =>
268 state.ended = true;
269 state.innerActive = false;
270 state.innerTalkback(. Close);
271 state.outerTalkback(. Close);
272 | Close => ()
273 },
274 ),
275 );
276 })
277 );
278
279[@genType]
280let concatAll = (source: sourceT(sourceT('a))): sourceT('a) =>
281 concatMap((. x) => x, source);
282
283[@genType]
284let concat = (sources: array(sourceT('a))): sourceT('a) =>
285 concatMap((. x) => x, Wonka_sources.fromArray(sources));
286
287[@genType]
288let filter = (f: (. 'a) => bool): operatorT('a, 'a) =>
289 curry(source =>
290 curry(sink => {
291 let talkback = ref(talkbackPlaceholder);
292
293 source((. signal) =>
294 switch (signal) {
295 | Start(tb) =>
296 talkback := tb;
297 sink(. signal);
298 | Push(x) when !f(. x) => talkback^(. Pull)
299 | _ => sink(. signal)
300 }
301 );
302 })
303 );
304
305[@genType]
306let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
307 curry(source =>
308 curry(sink =>
309 source((. signal) =>
310 sink(.
311 switch (signal) {
312 | Push(x) => Push(f(. x))
313 | _ => signal
314 },
315 )
316 )
317 )
318 );
319
320type mergeMapStateT = {
321 mutable outerTalkback: (. talkbackT) => unit,
322 mutable outerPulled: bool,
323 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
324 mutable ended: bool,
325};
326
327[@genType]
328let mergeMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
329 curry(source =>
330 curry(sink => {
331 let state: mergeMapStateT = {
332 outerTalkback: talkbackPlaceholder,
333 outerPulled: false,
334 innerTalkbacks: Rebel.Array.makeEmpty(),
335 ended: false,
336 };
337
338 let applyInnerSource = innerSource => {
339 let talkback = ref(talkbackPlaceholder);
340
341 innerSource((. signal) =>
342 switch (signal) {
343 | Start(tb) =>
344 talkback := tb;
345 state.innerTalkbacks =
346 Rebel.Array.append(state.innerTalkbacks, tb);
347 tb(. Pull);
348 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
349 sink(. Push(x));
350 talkback^(. Pull);
351 | Push(_) => ()
352 | End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
353 state.innerTalkbacks =
354 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
355 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
356 sink(. End);
357 };
358 | End => ()
359 }
360 );
361 };
362
363 source((. signal) =>
364 switch (signal) {
365 | Start(tb) => state.outerTalkback = tb
366 | Push(x) when !state.ended =>
367 state.outerPulled = false;
368 applyInnerSource(f(. x));
369 | Push(_) => ()
370 | End when !state.ended =>
371 state.ended = true;
372 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
373 sink(. End);
374 };
375 | End => ()
376 }
377 );
378
379 sink(.
380 Start(
381 (. signal) =>
382 if (!state.ended) {
383 switch (signal) {
384 | Close =>
385 let tbs = state.innerTalkbacks;
386 state.ended = true;
387 state.innerTalkbacks = Rebel.Array.makeEmpty();
388 state.outerTalkback(. signal);
389 Rebel.Array.forEach(tbs, tb => tb(. signal));
390 | Pull =>
391 if (!state.outerPulled) {
392 state.outerPulled = true;
393 state.outerTalkback(. Pull);
394 };
395
396 Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
397 };
398 },
399 ),
400 );
401 })
402 );
403
404[@genType]
405let merge = (sources: array(sourceT('a))): sourceT('a) =>
406 mergeMap((. x) => x, Wonka_sources.fromArray(sources));
407
408[@genType]
409let mergeAll = (source: sourceT(sourceT('a))): sourceT('a) =>
410 mergeMap((. x) => x, source);
411
412[@genType]
413let flatten = mergeAll;
414
415[@genType]
416let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
417 curry(source =>
418 curry(sink => {
419 source((. signal) =>
420 switch (signal) {
421 | Start(talkback) =>
422 sink(.
423 Start(
424 (. signal) => {
425 switch (signal) {
426 | Close => f(.)
427 | _ => ()
428 };
429 talkback(. signal);
430 },
431 ),
432 )
433 | End =>
434 sink(. signal);
435 f(.);
436 | _ => sink(. signal)
437 }
438 )
439 })
440 );
441
442[@genType]
443let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
444 curry(source =>
445 curry(sink =>
446 source((. signal) => {
447 switch (signal) {
448 | Push(x) => f(. x)
449 | _ => ()
450 };
451
452 sink(. signal);
453 })
454 )
455 );
456
457[@genType]
458let tap = onPush;
459
460[@genType]
461let onStart = (f: (. unit) => unit): operatorT('a, 'a) =>
462 curry(source =>
463 curry(sink =>
464 source((. signal) =>
465 switch (signal) {
466 | Start(_) =>
467 sink(. signal);
468 f(.);
469 | _ => sink(. signal)
470 }
471 )
472 )
473 );
474
475type sampleStateT('a) = {
476 mutable sourceTalkback: (. talkbackT) => unit,
477 mutable notifierTalkback: (. talkbackT) => unit,
478 mutable value: option('a),
479 mutable pulled: bool,
480 mutable ended: bool,
481};
482
483[@genType]
484let sample = (notifier: sourceT('a)): operatorT('b, 'b) =>
485 curry(source =>
486 curry(sink => {
487 let state = {
488 sourceTalkback: talkbackPlaceholder,
489 notifierTalkback: talkbackPlaceholder,
490 value: None,
491 pulled: false,
492 ended: false,
493 };
494
495 source((. signal) =>
496 switch (signal) {
497 | Start(tb) => state.sourceTalkback = tb
498 | Push(x) =>
499 state.value = Some(x);
500 state.notifierTalkback(. Pull);
501 | End when !state.ended =>
502 state.ended = true;
503 state.notifierTalkback(. Close);
504 sink(. End);
505 | End => ()
506 }
507 );
508
509 notifier((. signal) =>
510 switch (signal, state.value) {
511 | (Start(tb), _) => state.notifierTalkback = tb
512 | (End, _) when !state.ended =>
513 state.ended = true;
514 state.sourceTalkback(. Close);
515 sink(. End);
516 | (End, _) => ()
517 | (Push(_), Some(x)) when !state.ended =>
518 state.value = None;
519 sink(. Push(x));
520 | (Push(_), _) => ()
521 }
522 );
523
524 sink(.
525 Start(
526 (. signal) =>
527 if (!state.ended) {
528 switch (signal) {
529 | Pull when !state.pulled =>
530 state.pulled = true;
531 state.sourceTalkback(. Pull);
532 state.notifierTalkback(. Pull);
533 | Pull => ()
534 | Close =>
535 state.ended = true;
536 state.sourceTalkback(. Close);
537 state.notifierTalkback(. Close);
538 };
539 },
540 ),
541 );
542 })
543 );
544
545[@genType]
546let scan = (f: (. 'acc, 'a) => 'acc, seed: 'acc): operatorT('a, 'acc) =>
547 curry(source =>
548 curry(sink => {
549 let acc = ref(seed);
550
551 source((. signal) =>
552 sink(.
553 switch (signal) {
554 | Push(x) =>
555 acc := f(. acc^, x);
556 Push(acc^);
557 | Start(x) => Start(x)
558 | End => End
559 },
560 )
561 );
562 })
563 );
564
565type shareStateT('a) = {
566 mutable sinks: Rebel.Array.t(sinkT('a)),
567 mutable talkback: (. talkbackT) => unit,
568 mutable gotSignal: bool,
569};
570
571[@genType]
572let share = (source: sourceT('a)): sourceT('a) => {
573 let state = {
574 sinks: Rebel.Array.makeEmpty(),
575 talkback: talkbackPlaceholder,
576 gotSignal: false,
577 };
578
579 sink => {
580 state.sinks = Rebel.Array.append(state.sinks, sink);
581
582 if (Rebel.Array.size(state.sinks) === 1) {
583 source((. signal) =>
584 switch (signal) {
585 | Push(_) =>
586 state.gotSignal = false;
587 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
588 | Start(x) => state.talkback = x
589 | End =>
590 Rebel.Array.forEach(state.sinks, sink => sink(. End));
591 state.sinks = Rebel.Array.makeEmpty();
592 }
593 );
594 };
595
596 sink(.
597 Start(
598 (. signal) =>
599 switch (signal) {
600 | Close =>
601 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
602 if (Rebel.Array.size(state.sinks) === 0) {
603 state.talkback(. Close);
604 };
605 | Pull when !state.gotSignal =>
606 state.gotSignal = true;
607 state.talkback(. signal);
608 | Pull => ()
609 },
610 ),
611 );
612 };
613};
614
615type skipStateT = {
616 mutable talkback: (. talkbackT) => unit,
617 mutable rest: int,
618};
619
620[@genType]
621let skip = (wait: int): operatorT('a, 'a) =>
622 curry(source =>
623 curry(sink => {
624 let state: skipStateT = {talkback: talkbackPlaceholder, rest: wait};
625
626 source((. signal) =>
627 switch (signal) {
628 | Start(tb) =>
629 state.talkback = tb;
630 sink(. signal);
631 | Push(_) when state.rest > 0 =>
632 state.rest = state.rest - 1;
633 state.talkback(. Pull);
634 | _ => sink(. signal)
635 }
636 );
637 })
638 );
639
640type skipUntilStateT = {
641 mutable sourceTalkback: (. talkbackT) => unit,
642 mutable notifierTalkback: (. talkbackT) => unit,
643 mutable skip: bool,
644 mutable pulled: bool,
645 mutable ended: bool,
646};
647
648[@genType]
649let skipUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
650 curry(source =>
651 curry(sink => {
652 let state: skipUntilStateT = {
653 sourceTalkback: talkbackPlaceholder,
654 notifierTalkback: talkbackPlaceholder,
655 skip: true,
656 pulled: false,
657 ended: false,
658 };
659
660 source((. signal) =>
661 switch (signal) {
662 | Start(tb) =>
663 state.sourceTalkback = tb;
664
665 notifier((. signal) =>
666 switch (signal) {
667 | Start(innerTb) =>
668 state.notifierTalkback = innerTb;
669 innerTb(. Pull);
670 | Push(_) =>
671 state.skip = false;
672 state.notifierTalkback(. Close);
673 | End when state.skip =>
674 state.ended = true;
675 state.sourceTalkback(. Close);
676 | End => ()
677 }
678 );
679 | Push(_) when !state.skip && !state.ended =>
680 state.pulled = false;
681 sink(. signal);
682 | Push(_) => ()
683 | End =>
684 if (state.skip) {
685 state.notifierTalkback(. Close);
686 };
687 state.ended = true;
688 sink(. End);
689 }
690 );
691
692 sink(.
693 Start(
694 (. signal) =>
695 if (!state.ended) {
696 switch (signal) {
697 | Close =>
698 state.ended = true;
699 state.sourceTalkback(. Close);
700 if (state.skip) {
701 state.notifierTalkback(. Close);
702 };
703 | Pull when !state.pulled =>
704 state.pulled = true;
705 if (state.skip) {
706 state.notifierTalkback(. Pull);
707 };
708 state.sourceTalkback(. Pull);
709 | Pull => ()
710 };
711 },
712 ),
713 );
714 })
715 );
716
717type skipWhileStateT = {
718 mutable talkback: (. talkbackT) => unit,
719 mutable skip: bool,
720};
721
722[@genType]
723let skipWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
724 curry(source =>
725 curry(sink => {
726 let state: skipWhileStateT = {
727 talkback: talkbackPlaceholder,
728 skip: true,
729 };
730
731 source((. signal) =>
732 switch (signal) {
733 | Start(tb) =>
734 state.talkback = tb;
735 sink(. signal);
736 | Push(x) when state.skip =>
737 if (f(. x)) {
738 state.talkback(. Pull);
739 } else {
740 state.skip = false;
741 sink(. signal);
742 }
743 | _ => sink(. signal)
744 }
745 );
746 })
747 );
748
749type switchMapStateT('a) = {
750 mutable outerTalkback: (. talkbackT) => unit,
751 mutable outerPulled: bool,
752 mutable innerTalkback: (. talkbackT) => unit,
753 mutable innerActive: bool,
754 mutable innerPulled: bool,
755 mutable ended: bool,
756};
757
758[@genType]
759let switchMap = (f: (. 'a) => sourceT('b)): operatorT('a, 'b) =>
760 curry(source =>
761 curry(sink => {
762 let state: switchMapStateT('a) = {
763 outerTalkback: talkbackPlaceholder,
764 outerPulled: false,
765 innerTalkback: talkbackPlaceholder,
766 innerActive: false,
767 innerPulled: false,
768 ended: false,
769 };
770
771 let applyInnerSource = innerSource =>
772 innerSource((. signal) =>
773 switch (signal) {
774 | Start(tb) =>
775 state.innerActive = true;
776 state.innerTalkback = tb;
777 state.innerPulled = false;
778 tb(. Pull);
779 | Push(_) when state.innerActive =>
780 sink(. signal);
781 if (!state.innerPulled) {
782 state.innerTalkback(. Pull);
783 } else {
784 state.innerPulled = false;
785 };
786 | Push(_) => ()
787 | End when state.innerActive =>
788 state.innerActive = false;
789 if (state.ended) {
790 sink(. signal);
791 };
792 | End => ()
793 }
794 );
795
796 source((. signal) =>
797 switch (signal) {
798 | Start(tb) => state.outerTalkback = tb
799 | Push(x) when !state.ended =>
800 if (state.innerActive) {
801 state.innerTalkback(. Close);
802 state.innerTalkback = talkbackPlaceholder;
803 };
804
805 if (!state.outerPulled) {
806 state.outerPulled = true;
807 state.outerTalkback(. Pull);
808 } else {
809 state.outerPulled = false;
810 };
811
812 applyInnerSource(f(. x));
813 | Push(_) => ()
814 | End when !state.ended =>
815 state.ended = true;
816 if (!state.innerActive) {
817 sink(. End);
818 };
819 | End => ()
820 }
821 );
822
823 sink(.
824 Start(
825 (. signal) =>
826 switch (signal) {
827 | Pull =>
828 if (!state.ended && !state.outerPulled) {
829 state.outerPulled = true;
830 state.outerTalkback(. Pull);
831 };
832 if (state.innerActive && !state.innerPulled) {
833 state.innerPulled = true;
834 state.innerTalkback(. Pull);
835 };
836 | Close when !state.ended =>
837 state.ended = true;
838 state.innerActive = false;
839 state.innerTalkback(. Close);
840 state.outerTalkback(. Close);
841 | Close => ()
842 },
843 ),
844 );
845 })
846 );
847
848[@genType]
849let switchAll = (source: sourceT(sourceT('a))): sourceT('a) =>
850 switchMap((. x) => x, source);
851
852type takeStateT = {
853 mutable ended: bool,
854 mutable taken: int,
855 mutable talkback: (. talkbackT) => unit,
856};
857
858[@genType]
859let take = (max: int): operatorT('a, 'a) =>
860 curry(source =>
861 curry(sink => {
862 let state: takeStateT = {
863 ended: false,
864 taken: 0,
865 talkback: talkbackPlaceholder,
866 };
867
868 source((. signal) =>
869 switch (signal) {
870 | Start(tb) when max <= 0 =>
871 state.ended = true;
872 sink(. End);
873 tb(. Close);
874 | Start(tb) => state.talkback = tb
875 | Push(_) when state.taken < max && !state.ended =>
876 state.taken = state.taken + 1;
877 sink(. signal);
878 if (!state.ended && state.taken >= max) {
879 state.ended = true;
880 sink(. End);
881 state.talkback(. Close);
882 };
883 | Push(_) => ()
884 | End when !state.ended =>
885 state.ended = true;
886 sink(. End);
887 | End => ()
888 }
889 );
890
891 sink(.
892 Start(
893 (. signal) =>
894 if (!state.ended) {
895 switch (signal) {
896 | Pull when state.taken < max => state.talkback(. Pull)
897 | Pull => ()
898 | Close =>
899 state.ended = true;
900 state.talkback(. Close);
901 };
902 },
903 ),
904 );
905 })
906 );
907
908[@genType]
909let takeLast = (max: int): operatorT('a, 'a) =>
910 curry(source =>
911 curry(sink => {
912 open Rebel;
913 let talkback = ref(talkbackPlaceholder);
914 let queue = MutableQueue.make();
915
916 source((. signal) =>
917 switch (signal) {
918 | Start(tb) when max <= 0 =>
919 tb(. Close);
920 Wonka_sources.empty(sink);
921 | Start(tb) =>
922 talkback := tb;
923 tb(. Pull);
924 | Push(x) =>
925 let size = MutableQueue.size(queue);
926 if (size >= max && max > 0) {
927 ignore(MutableQueue.pop(queue));
928 };
929
930 MutableQueue.add(queue, x);
931 talkback^(. Pull);
932 | End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
933 }
934 );
935 })
936 );
937
938type takeUntilStateT = {
939 mutable ended: bool,
940 mutable sourceTalkback: (. talkbackT) => unit,
941 mutable notifierTalkback: (. talkbackT) => unit,
942};
943
944[@genType]
945let takeUntil = (notifier: sourceT('a)): operatorT('b, 'b) =>
946 curry(source =>
947 curry(sink => {
948 let state: takeUntilStateT = {
949 ended: false,
950 sourceTalkback: talkbackPlaceholder,
951 notifierTalkback: talkbackPlaceholder,
952 };
953
954 source((. signal) =>
955 switch (signal) {
956 | Start(tb) =>
957 state.sourceTalkback = tb;
958
959 notifier((. signal) =>
960 switch (signal) {
961 | Start(innerTb) =>
962 state.notifierTalkback = innerTb;
963 innerTb(. Pull);
964 | Push(_) =>
965 state.ended = true;
966 state.sourceTalkback(. Close);
967 sink(. End);
968 | End => ()
969 }
970 );
971 | End when !state.ended =>
972 state.ended = true;
973 state.notifierTalkback(. Close);
974 sink(. End);
975 | End => ()
976 | Push(_) when !state.ended => sink(. signal)
977 | Push(_) => ()
978 }
979 );
980
981 sink(.
982 Start(
983 (. signal) =>
984 if (!state.ended) {
985 switch (signal) {
986 | Close =>
987 state.ended = true;
988 state.sourceTalkback(. Close);
989 state.notifierTalkback(. Close);
990 | Pull => state.sourceTalkback(. Pull)
991 };
992 },
993 ),
994 );
995 })
996 );
997
998type takeWhileStateT = {
999 mutable talkback: (. talkbackT) => unit,
1000 mutable ended: bool,
1001};
1002
1003[@genType]
1004let takeWhile = (f: (. 'a) => bool): operatorT('a, 'a) =>
1005 curry(source =>
1006 curry(sink => {
1007 let state: takeWhileStateT = {
1008 talkback: talkbackPlaceholder,
1009 ended: false,
1010 };
1011
1012 source((. signal) =>
1013 switch (signal) {
1014 | Start(tb) =>
1015 state.talkback = tb;
1016 sink(. signal);
1017 | End when !state.ended =>
1018 state.ended = true;
1019 sink(. End);
1020 | End => ()
1021 | Push(x) when !state.ended =>
1022 if (!f(. x)) {
1023 state.ended = true;
1024 sink(. End);
1025 state.talkback(. Close);
1026 } else {
1027 sink(. signal);
1028 }
1029 | Push(_) => ()
1030 }
1031 );
1032 })
1033 );