1open Wonka_types;
2open Wonka_helpers;
3
4type skipUntilStateT = {
5 mutable skip: bool,
6 mutable ended: bool,
7 mutable gotSignal: bool,
8 mutable sourceTalkback: (. talkbackT) => unit,
9 mutable notifierTalkback: (. talkbackT) => unit,
10};
11
12let skipUntil = notifier =>
13 curry(source =>
14 curry(sink => {
15 let state: skipUntilStateT = {
16 skip: true,
17 ended: false,
18 gotSignal: false,
19 sourceTalkback: talkbackPlaceholder,
20 notifierTalkback: talkbackPlaceholder,
21 };
22
23 source((. signal) =>
24 switch (signal) {
25 | Start(tb) =>
26 state.sourceTalkback = tb;
27
28 notifier((. signal) =>
29 switch (signal) {
30 | Start(innerTb) =>
31 state.notifierTalkback = innerTb;
32 innerTb(. Pull);
33 tb(. Pull);
34 | Push(_) =>
35 state.skip = false;
36 state.notifierTalkback(. Close);
37 | End => ()
38 }
39 );
40 | Push(_) when state.skip && !state.ended =>
41 state.sourceTalkback(. Pull)
42 | Push(_) when !state.ended =>
43 state.gotSignal = false;
44 sink(. signal);
45 | Push(_) => ()
46 | End =>
47 if (state.skip) {
48 state.notifierTalkback(. Close);
49 };
50 state.ended = true;
51 sink(. End);
52 }
53 );
54
55 sink(.
56 Start(
57 (. signal) =>
58 switch (signal) {
59 | Close =>
60 if (state.skip) {
61 state.notifierTalkback(. Close);
62 };
63 state.ended = true;
64 state.sourceTalkback(. Close);
65 | Pull when !state.gotSignal && !state.ended =>
66 state.gotSignal = true;
67 state.sourceTalkback(. Pull);
68 | Pull => ()
69 },
70 ),
71 );
72 })
73 );