1open Wonka_types;
2open Wonka_helpers;
3
4type mergeMapStateT = {
5 mutable outerTalkback: (. talkbackT) => unit,
6 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit),
7 mutable ended: bool,
8};
9
10let mergeMap = f =>
11 curry(source =>
12 curry(sink => {
13 let state: mergeMapStateT = {
14 outerTalkback: talkbackPlaceholder,
15 innerTalkbacks: Rebel.Array.makeEmpty(),
16 ended: false,
17 };
18
19 let applyInnerSource = innerSource => {
20 let talkback = ref(talkbackPlaceholder);
21
22 innerSource((. signal) =>
23 switch (signal) {
24 | End =>
25 state.innerTalkbacks =
26 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
27 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
28 sink(. End);
29 };
30 | Start(tb) =>
31 talkback := tb;
32 state.innerTalkbacks =
33 Rebel.Array.append(state.innerTalkbacks, tb);
34 tb(. Pull);
35 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
36 sink(. Push(x));
37 talkback^(. Pull);
38 | Push(_) => ()
39 }
40 );
41 };
42
43 source((. signal) =>
44 switch (signal) {
45 | End when !state.ended =>
46 state.ended = true;
47 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
48 sink(. End);
49 };
50 | End => ()
51 | Start(tb) =>
52 state.outerTalkback = tb;
53 tb(. Pull);
54 | Push(x) when !state.ended =>
55 applyInnerSource(f(. x));
56 state.outerTalkback(. Pull);
57 | Push(_) => ()
58 }
59 );
60
61 sink(.
62 Start(
63 (. signal) =>
64 switch (signal) {
65 | Close =>
66 Rebel.Array.forEach(state.innerTalkbacks, talkback =>
67 talkback(. Close)
68 );
69 if (!state.ended) {
70 state.ended = true;
71 state.outerTalkback(. Close);
72 Rebel.Array.forEach(state.innerTalkbacks, talkback =>
73 talkback(. Close)
74 );
75 state.innerTalkbacks = Rebel.Array.makeEmpty();
76 };
77 | Pull when !state.ended =>
78 Rebel.Array.forEach(state.innerTalkbacks, talkback =>
79 talkback(. Pull)
80 )
81 | Pull => ()
82 },
83 ),
84 );
85 })
86 );
87
88let merge = sources => {
89 Wonka_source_fromArray.(mergeMap((. x) => x, fromArray(sources)));
90};
91
92let mergeAll = source => mergeMap((. x) => x, source);
93let flatten = mergeAll;