1open Wonka_types;
2open Wonka_helpers;
3
4type takeUntilStateT = {
5 mutable ended: bool,
6 mutable sourceTalkback: (.talkbackT) => unit,
7 mutable notifierTalkback: (.talkbackT) => unit
8};
9
10let takeUntil = notifier => curry(source => curry(sink => {
11 let state: takeUntilStateT = {
12 ended: false,
13 sourceTalkback: talkbackPlaceholder,
14 notifierTalkback: talkbackPlaceholder
15 };
16
17 source((.signal) => {
18 switch (signal) {
19 | Start(tb) => {
20 state.sourceTalkback = tb;
21
22 notifier((.signal) => {
23 switch (signal) {
24 | Start(innerTb) => {
25 state.notifierTalkback = innerTb;
26 innerTb(.Pull);
27 }
28 | Push(_) => {
29 state.ended = true;
30 state.notifierTalkback(.Close);
31 state.sourceTalkback(.Close);
32 sink(.End);
33 }
34 | End => ()
35 }
36 });
37 }
38 | End when !state.ended => {
39 state.notifierTalkback(.Close);
40 state.ended = true;
41 sink(.End);
42 }
43 | End => ()
44 | Push(_) when !state.ended => sink(.signal)
45 | Push(_) => ()
46 }
47 });
48
49 sink(.Start((.signal) => {
50 if (!state.ended) {
51 switch (signal) {
52 | Close => {
53 state.sourceTalkback(.Close);
54 state.notifierTalkback(.Close);
55 }
56 | Pull => state.sourceTalkback(.Pull)
57 }
58 };
59 }));
60}));