1open Wonka_types;
2
3type sampleStateT('a) = {
4 mutable ended: bool,
5 mutable value: option('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8};
9
10let sample = notifier =>
11 curry(source =>
12 curry(sink => {
13 let state = {
14 ended: false,
15 value: None,
16 sourceTalkback: (. _: talkbackT) => (),
17 notifierTalkback: (. _: talkbackT) => (),
18 };
19
20 source((. signal) =>
21 switch (signal) {
22 | Start(tb) => state.sourceTalkback = tb
23 | End =>
24 state.ended = true;
25 state.notifierTalkback(. Close);
26 sink(. End);
27 | Push(x) => state.value = Some(x)
28 }
29 );
30
31 notifier((. signal) =>
32 switch (signal, state.value) {
33 | (Start(tb), _) => state.notifierTalkback = tb
34 | (End, _) =>
35 state.ended = true;
36 state.sourceTalkback(. Close);
37 sink(. End);
38 | (Push(_), Some(x)) when !state.ended =>
39 state.value = None;
40 sink(. Push(x));
41 | (Push(_), _) => ()
42 }
43 );
44
45 sink(.
46 Start(
47 (. signal) =>
48 switch (signal) {
49 | Pull =>
50 state.sourceTalkback(. Pull);
51 state.notifierTalkback(. Pull);
52 | Close =>
53 state.ended = true;
54 state.sourceTalkback(. Close);
55 state.notifierTalkback(. Close);
56 },
57 ),
58 );
59 })
60 );