1open Wonka_types;
2open Wonka_helpers;
3
4type switchMapStateT('a) = {
5 mutable outerTalkback: (. talkbackT) => unit,
6 mutable innerTalkback: (. talkbackT) => unit,
7 mutable innerActive: bool,
8 mutable closed: bool,
9 mutable ended: bool,
10};
11
12let switchMap = f =>
13 curry(source =>
14 curry(sink => {
15 let state: switchMapStateT('a) = {
16 outerTalkback: talkbackPlaceholder,
17 innerTalkback: talkbackPlaceholder,
18 innerActive: false,
19 closed: false,
20 ended: false,
21 };
22
23 let applyInnerSource = innerSource =>
24 innerSource((. signal) =>
25 switch (signal) {
26 | End =>
27 state.innerActive = false;
28 state.innerTalkback = talkbackPlaceholder;
29 if (state.ended) {
30 sink(. End);
31 };
32 | Start(tb) =>
33 state.innerActive = true;
34 state.innerTalkback = tb;
35 tb(. Pull);
36 | Push(x) when !state.closed =>
37 sink(. Push(x));
38 state.innerTalkback(. Pull);
39 | Push(_) => ()
40 }
41 );
42
43 source((. signal) =>
44 switch (signal) {
45 | End when !state.ended =>
46 state.ended = true;
47 if (!state.innerActive) {
48 sink(. End);
49 };
50 | End => ()
51 | Start(tb) =>
52 state.outerTalkback = tb;
53 tb(. Pull);
54 | Push(x) when !state.ended =>
55 if (state.innerActive) {
56 state.innerTalkback(. Close);
57 state.innerTalkback = talkbackPlaceholder;
58 };
59 applyInnerSource(f(. x));
60 state.outerTalkback(. Pull);
61 | Push(_) => ()
62 }
63 );
64
65 sink(.
66 Start(
67 (. signal) =>
68 switch (signal) {
69 | Pull => state.innerTalkback(. Pull)
70 | Close =>
71 state.innerTalkback(. Close);
72 if (!state.ended) {
73 state.ended = true;
74 state.closed = true;
75 state.outerTalkback(. Close);
76 state.innerTalkback = talkbackPlaceholder;
77 };
78 },
79 ),
80 );
81 })
82 );
83
84let switchAll = source => switchMap((. x) => x, source);