1open Wonka_types;
2open Wonka_helpers;
3
4type concatMapStateT('a) = {
5 inputQueue: Rebel.MutableQueue.t('a),
6 mutable outerTalkback: (.talkbackT) => unit,
7 mutable innerTalkback: (.talkbackT) => unit,
8 mutable innerActive: bool,
9 mutable closed: bool,
10 mutable ended: bool
11};
12
13let concatMap = f => curry(source => curry(sink => {
14 let state: concatMapStateT('a) = {
15 inputQueue: Rebel.MutableQueue.make(),
16 outerTalkback: talkbackPlaceholder,
17 innerTalkback: talkbackPlaceholder,
18 innerActive: false,
19 closed: false,
20 ended: false
21 };
22
23 let rec applyInnerSource = innerSource =>
24 innerSource((.signal) => {
25 switch (signal) {
26 | End => {
27 state.innerActive = false;
28 state.innerTalkback = talkbackPlaceholder;
29
30 switch (Rebel.MutableQueue.pop(state.inputQueue)) {
31 | Some(input) => applyInnerSource(f(.input))
32 | None when state.ended => sink(.End)
33 | None => ()
34 };
35 }
36 | Start(tb) => {
37 state.innerActive = true;
38 state.innerTalkback = tb;
39 tb(.Pull);
40 }
41 | Push(x) when !state.closed => {
42 sink(.Push(x));
43 state.innerTalkback(.Pull);
44 }
45 | Push(_) => ()
46 }
47 });
48
49 source((.signal) => {
50 switch (signal) {
51 | End when !state.ended => {
52 state.ended = true;
53 if (!state.innerActive && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
54 sink(.End);
55 }
56 }
57 | End => ()
58 | Start(tb) => {
59 state.outerTalkback = tb;
60 tb(.Pull);
61 }
62 | Push(x) when !state.ended => {
63 if (state.innerActive) {
64 Rebel.MutableQueue.add(state.inputQueue, x);
65 } else {
66 applyInnerSource(f(.x));
67 }
68
69 state.outerTalkback(.Pull);
70 }
71 | Push(_) => ()
72 }
73 });
74
75 sink(.Start((.signal) => {
76 switch (signal) {
77 | Pull => if (!state.ended) state.innerTalkback(.Pull)
78 | Close => {
79 state.innerTalkback(.Close);
80 if (!state.ended) {
81 state.ended = true;
82 state.closed = true;
83 state.outerTalkback(.Close);
84 state.innerTalkback = talkbackPlaceholder;
85 }
86 }
87 }
88 }));
89}));
90
91let concatAll = source => concatMap((.x) => x, source);
92
93let concat = sources => {
94 open Wonka_source_fromArray;
95 concatMap((.x) => x, fromArray(sources));
96};