1open Wonka_types;
2open Wonka_helpers;
3
4type switchMapStateT('a) = {
5 mutable outerTalkback: (.talkbackT) => unit,
6 mutable innerTalkback: (.talkbackT) => unit,
7 mutable innerActive: bool,
8 mutable closed: bool,
9 mutable ended: bool
10};
11
12let switchMap = f => curry(source => curry(sink => {
13 let state: switchMapStateT('a) = {
14 outerTalkback: talkbackPlaceholder,
15 innerTalkback: talkbackPlaceholder,
16 innerActive: false,
17 closed: false,
18 ended: false
19 };
20
21 let applyInnerSource = innerSource =>
22 innerSource((.signal) => {
23 switch (signal) {
24 | End => {
25 state.innerActive = false;
26 state.innerTalkback = talkbackPlaceholder;
27 if (state.ended) sink(.End);
28 }
29 | Start(tb) => {
30 state.innerActive = true;
31 state.innerTalkback = tb;
32 tb(.Pull);
33 }
34 | Push(x) when !state.closed => {
35 sink(.Push(x));
36 state.innerTalkback(.Pull);
37 }
38 | Push(_) => ()
39 }
40 });
41
42 source((.signal) => {
43 switch (signal) {
44 | End when !state.ended => {
45 state.ended = true;
46 if (!state.innerActive) sink(.End);
47 }
48 | End => ()
49 | Start(tb) => {
50 state.outerTalkback = tb;
51 tb(.Pull);
52 }
53 | Push(x) when !state.ended => {
54 if (state.innerActive) {
55 state.innerTalkback(.Close);
56 state.innerTalkback = talkbackPlaceholder;
57 }
58 applyInnerSource(f(.x));
59 state.outerTalkback(.Pull);
60 }
61 | Push(_) => ()
62 }
63 });
64
65 sink(.Start((.signal) => {
66 switch (signal) {
67 | Pull => state.innerTalkback(.Pull)
68 | Close => {
69 state.innerTalkback(.Close);
70 if (!state.ended) {
71 state.ended = true;
72 state.closed = true;
73 state.outerTalkback(.Close);
74 state.innerTalkback = talkbackPlaceholder;
75 }
76 }
77 }
78 }));
79}));