1open Wonka_types;
2open Wonka_helpers;
3
4type takeUntilStateT = {
5 mutable ended: bool,
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8};
9
10let takeUntil = notifier =>
11 curry(source =>
12 curry(sink => {
13 let state: takeUntilStateT = {
14 ended: false,
15 sourceTalkback: talkbackPlaceholder,
16 notifierTalkback: talkbackPlaceholder,
17 };
18
19 source((. signal) =>
20 switch (signal) {
21 | Start(tb) =>
22 state.sourceTalkback = tb;
23
24 notifier((. signal) =>
25 switch (signal) {
26 | Start(innerTb) =>
27 state.notifierTalkback = innerTb;
28 innerTb(. Pull);
29 | Push(_) =>
30 state.ended = true;
31 state.sourceTalkback(. Close);
32 sink(. End);
33 | End => ()
34 }
35 );
36 | End when !state.ended =>
37 state.ended = true;
38 state.notifierTalkback(. Close);
39 sink(. End);
40 | End => ()
41 | Push(_) when !state.ended => sink(. signal)
42 | Push(_) => ()
43 }
44 );
45
46 sink(.
47 Start(
48 (. signal) =>
49 if (!state.ended) {
50 switch (signal) {
51 | Close =>
52 state.ended = true;
53 state.sourceTalkback(. Close);
54 state.notifierTalkback(. Close);
55 | Pull => state.sourceTalkback(. Pull)
56 };
57 },
58 ),
59 );
60 })
61 );