1open Wonka_types;
2open Wonka_helpers;
3
4type mergeMapStateT = {
5 mutable outerTalkback: (.talkbackT) => unit,
6 mutable innerTalkbacks: Rebel.Array.t((.talkbackT) => unit),
7 mutable ended: bool
8};
9
10let mergeMap = f => curry(source => curry(sink => {
11 let state: mergeMapStateT = {
12 outerTalkback: talkbackPlaceholder,
13 innerTalkbacks: Rebel.Array.makeEmpty(),
14 ended: false
15 };
16
17 let applyInnerSource = innerSource => {
18 let talkback = ref(talkbackPlaceholder);
19
20 innerSource((.signal) => {
21 switch (signal) {
22 | End => {
23 state.innerTalkbacks = Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
24 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
25 sink(.End);
26 }
27 }
28 | Start(tb) => {
29 talkback := tb;
30 state.innerTalkbacks = Rebel.Array.append(state.innerTalkbacks, tb);
31 tb(.Pull);
32 }
33 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => {
34 sink(.Push(x));
35 talkback^(.Pull);
36 }
37 | Push(_) => ()
38 }
39 });
40 };
41
42 source((.signal) => {
43 switch (signal) {
44 | End when !state.ended => {
45 state.ended = true;
46 if (Rebel.Array.size(state.innerTalkbacks) === 0) {
47 sink(.End);
48 }
49 }
50 | End => ()
51 | Start(tb) => {
52 state.outerTalkback = tb;
53 tb(.Pull);
54 }
55 | Push(x) when !state.ended => {
56 applyInnerSource(f(.x));
57 state.outerTalkback(.Pull);
58 }
59 | Push(_) => ()
60 }
61 });
62
63 sink(.Start((.signal) => {
64 switch (signal) {
65 | Close => {
66 Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close));
67 if (!state.ended) {
68 state.ended = true;
69 state.outerTalkback(.Close);
70 Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close));
71 state.innerTalkbacks = Rebel.Array.makeEmpty();
72 }
73 }
74 | Pull when !state.ended =>
75 Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Pull));
76 | Pull => ()
77 }
78 }));
79}));
80
81let merge = sources => {
82 open Wonka_source_fromArray;
83 mergeMap((.x) => x, fromArray(sources));
84};
85
86let mergeAll = source => mergeMap((.x) => x, source);
87let flatten = mergeAll;