1open Wonka_types;
2open Wonka_helpers;
3
4type concatMapStateT('a) = {
5 inputQueue: Rebel.MutableQueue.t('a),
6 mutable outerTalkback: (. talkbackT) => unit,
7 mutable innerTalkback: (. talkbackT) => unit,
8 mutable innerActive: bool,
9 mutable closed: bool,
10 mutable ended: bool,
11};
12
13let concatMap = f =>
14 curry(source =>
15 curry(sink => {
16 let state: concatMapStateT('a) = {
17 inputQueue: Rebel.MutableQueue.make(),
18 outerTalkback: talkbackPlaceholder,
19 innerTalkback: talkbackPlaceholder,
20 innerActive: false,
21 closed: false,
22 ended: false,
23 };
24
25 let rec applyInnerSource = innerSource =>
26 innerSource((. signal) =>
27 switch (signal) {
28 | End =>
29 state.innerActive = false;
30 state.innerTalkback = talkbackPlaceholder;
31
32 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
33 | Some(input) => applyInnerSource(f(. input))
34 | None when state.ended => sink(. End)
35 | None => ()
36 };
37 | Start(tb) =>
38 state.innerActive = true;
39 state.innerTalkback = tb;
40 tb(. Pull);
41 | Push(x) when !state.closed =>
42 sink(. Push(x));
43 state.innerTalkback(. Pull);
44 | Push(_) => ()
45 }
46 );
47
48 source((. signal) =>
49 switch (signal) {
50 | End when !state.ended =>
51 state.ended = true;
52 if (!state.innerActive
53 && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
54 sink(. End);
55 };
56 | End => ()
57 | Start(tb) =>
58 state.outerTalkback = tb;
59 tb(. Pull);
60 | Push(x) when !state.ended =>
61 if (state.innerActive) {
62 Rebel.MutableQueue.add(state.inputQueue, x);
63 } else {
64 applyInnerSource(f(. x));
65 };
66
67 state.outerTalkback(. Pull);
68 | Push(_) => ()
69 }
70 );
71
72 sink(.
73 Start(
74 (. signal) =>
75 switch (signal) {
76 | Pull =>
77 if (!state.ended) {
78 state.innerTalkback(. Pull);
79 }
80 | Close =>
81 state.innerTalkback(. Close);
82 if (!state.ended) {
83 state.ended = true;
84 state.closed = true;
85 state.outerTalkback(. Close);
86 state.innerTalkback = talkbackPlaceholder;
87 };
88 },
89 ),
90 );
91 })
92 );
93
94let concatAll = source => concatMap((. x) => x, source);
95
96let concat = sources => {
97 Wonka_source_fromArray.(concatMap((. x) => x, fromArray(sources)));
98};