1open Wonka_types;
2
3type sampleStateT('a) = {
4 mutable ended: bool,
5 mutable value: option('a),
6 mutable sourceTalkback: (.talkbackT) => unit,
7 mutable notifierTalkback: (.talkbackT) => unit
8};
9
10let sample = notifier => curry(source => curry(sink => {
11 let state = {
12 ended: false,
13 value: None,
14 sourceTalkback: (._: talkbackT) => (),
15 notifierTalkback: (._: talkbackT) => ()
16 };
17
18 source((.signal) => {
19 switch (signal) {
20 | Start(tb) => state.sourceTalkback = tb
21 | End => {
22 state.ended = true;
23 state.notifierTalkback(.Close);
24 sink(.End);
25 }
26 | Push(x) => state.value = Some(x)
27 }
28 });
29
30 notifier((.signal) => {
31 switch (signal, state.value) {
32 | (Start(tb), _) => state.notifierTalkback = tb
33 | (End, _) => {
34 state.ended = true;
35 state.sourceTalkback(.Close);
36 sink(.End);
37 }
38 | (Push(_), Some(x)) when !state.ended => {
39 state.value = None;
40 sink(.Push(x));
41 }
42 | (Push(_), _) => ()
43 }
44 });
45
46 sink(.Start((.signal) => {
47 switch (signal) {
48 | Pull => {
49 state.sourceTalkback(.Pull);
50 state.notifierTalkback(.Pull);
51 }
52 | Close => {
53 state.ended = true;
54 state.sourceTalkback(.Close);
55 state.notifierTalkback(.Close);
56 }
57 }
58 }));
59}));