1open Wonka_types;
2open Wonka_helpers;
3
4type skipUntilStateT = {
5 mutable skip: bool,
6 mutable ended: bool,
7 mutable gotSignal: bool,
8 mutable sourceTalkback: (.talkbackT) => unit,
9 mutable notifierTalkback: (.talkbackT) => unit
10};
11
12let skipUntil = notifier => curry(source => curry(sink => {
13 let state: skipUntilStateT = {
14 skip: true,
15 ended: false,
16 gotSignal: false,
17 sourceTalkback: talkbackPlaceholder,
18 notifierTalkback: talkbackPlaceholder
19 };
20
21 source((.signal) => {
22 switch (signal) {
23 | Start(tb) => {
24 state.sourceTalkback = tb;
25
26 notifier((.signal) => {
27 switch (signal) {
28 | Start(innerTb) => {
29 state.notifierTalkback = innerTb;
30 innerTb(.Pull);
31 tb(.Pull);
32 }
33 | Push(_) => {
34 state.skip = false;
35 state.notifierTalkback(.Close);
36 }
37 | End => ()
38 }
39 });
40 }
41 | Push(_) when state.skip && !state.ended => state.sourceTalkback(.Pull)
42 | Push(_) when !state.ended => {
43 state.gotSignal = false;
44 sink(.signal)
45 }
46 | Push(_) => ()
47 | End => {
48 if (state.skip) state.notifierTalkback(.Close);
49 state.ended = true;
50 sink(.End)
51 }
52 }
53 });
54
55 sink(.Start((.signal) => {
56 switch (signal) {
57 | Close => {
58 if (state.skip) state.notifierTalkback(.Close);
59 state.ended = true;
60 state.sourceTalkback(.Close);
61 }
62 | Pull when !state.gotSignal && !state.ended => {
63 state.gotSignal = true;
64 state.sourceTalkback(.Pull);
65 }
66 | Pull => ()
67 }
68 }));
69}));