1open Wonka_types;
2open Wonka_helpers;
3
4type takeUntilStateT = {
5 mutable ended: bool,
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8};
9
10let takeUntil = notifier =>
11 curry(source =>
12 curry(sink => {
13 let state: takeUntilStateT = {
14 ended: false,
15 sourceTalkback: talkbackPlaceholder,
16 notifierTalkback: talkbackPlaceholder,
17 };
18
19 source((. signal) =>
20 switch (signal) {
21 | Start(tb) =>
22 state.sourceTalkback = tb;
23
24 notifier((. signal) =>
25 switch (signal) {
26 | Start(innerTb) =>
27 state.notifierTalkback = innerTb;
28 innerTb(. Pull);
29 | Push(_) =>
30 state.ended = true;
31 state.notifierTalkback(. Close);
32 state.sourceTalkback(. Close);
33 sink(. End);
34 | End => ()
35 }
36 );
37 | End when !state.ended =>
38 state.notifierTalkback(. Close);
39 state.ended = true;
40 sink(. End);
41 | End => ()
42 | Push(_) when !state.ended => sink(. signal)
43 | Push(_) => ()
44 }
45 );
46
47 sink(.
48 Start(
49 (. signal) =>
50 if (!state.ended) {
51 switch (signal) {
52 | Close =>
53 state.sourceTalkback(. Close);
54 state.notifierTalkback(. Close);
55 | Pull => state.sourceTalkback(. Pull)
56 };
57 },
58 ),
59 );
60 })
61 );