1open Wonka_types;
2open Wonka_helpers;
3
4type takeStateT = {
5 mutable taken: int,
6 mutable talkback: (. talkbackT) => unit,
7};
8
9let take = max =>
10 curry(source =>
11 curry(sink => {
12 let state: takeStateT = {taken: 0, talkback: talkbackPlaceholder};
13
14 source((. signal) =>
15 switch (signal) {
16 | Start(tb) => state.talkback = tb
17 | Push(_) when state.taken < max =>
18 state.taken = state.taken + 1;
19 sink(. signal);
20
21 if (state.taken === max) {
22 sink(. End);
23 state.talkback(. Close);
24 };
25 | Push(_) => ()
26 | End when state.taken < max =>
27 state.taken = max;
28 sink(. End);
29 | End => ()
30 }
31 );
32
33 sink(.
34 Start(
35 (. signal) =>
36 if (state.taken < max) {
37 switch (signal) {
38 | Pull => state.talkback(. Pull)
39 | Close =>
40 state.taken = max;
41 state.talkback(. Close);
42 };
43 },
44 ),
45 );
46 })
47 );