1open Wonka_types;
2open Wonka_helpers;
3
4type takeStateT = {
5 mutable taken: int,
6 mutable talkback: (.talkbackT) => unit
7};
8
9let take = max => curry(source => curry(sink => {
10 let state: takeStateT = {
11 taken: 0,
12 talkback: talkbackPlaceholder
13 };
14
15 source((.signal) => {
16 switch (signal) {
17 | Start(tb) => state.talkback = tb;
18 | Push(_) when state.taken < max => {
19 state.taken = state.taken + 1;
20 sink(.signal);
21
22 if (state.taken === max) {
23 sink(.End);
24 state.talkback(.Close);
25 };
26 }
27 | Push(_) => ()
28 | End when state.taken < max => {
29 state.taken = max;
30 sink(.End)
31 }
32 | End => ()
33 }
34 });
35
36 sink(.Start((.signal) => {
37 if (state.taken < max) {
38 switch (signal) {
39 | Pull => state.talkback(.Pull);
40 | Close => {
41 state.taken = max;
42 state.talkback(.Close);
43 }
44 }
45 };
46 }));
47}));