open Wonka_types; open Wonka_helpers; type concatMapStateT('a) = { inputQueue: Rebel.MutableQueue.t('a), mutable outerTalkback: (.talkbackT) => unit, mutable innerTalkback: (.talkbackT) => unit, mutable innerActive: bool, mutable closed: bool, mutable ended: bool }; let concatMap = f => curry(source => curry(sink => { let state: concatMapStateT('a) = { inputQueue: Rebel.MutableQueue.make(), outerTalkback: talkbackPlaceholder, innerTalkback: talkbackPlaceholder, innerActive: false, closed: false, ended: false }; let rec applyInnerSource = innerSource => innerSource((.signal) => { switch (signal) { | End => { state.innerActive = false; state.innerTalkback = talkbackPlaceholder; switch (Rebel.MutableQueue.pop(state.inputQueue)) { | Some(input) => applyInnerSource(f(.input)) | None when state.ended => sink(.End) | None => () }; } | Start(tb) => { state.innerActive = true; state.innerTalkback = tb; tb(.Pull); } | Push(x) when !state.closed => { sink(.Push(x)); state.innerTalkback(.Pull); } | Push(_) => () } }); source((.signal) => { switch (signal) { | End when !state.ended => { state.ended = true; if (!state.innerActive && Rebel.MutableQueue.isEmpty(state.inputQueue)) { sink(.End); } } | End => () | Start(tb) => { state.outerTalkback = tb; tb(.Pull); } | Push(x) when !state.ended => { if (state.innerActive) { Rebel.MutableQueue.add(state.inputQueue, x); } else { applyInnerSource(f(.x)); } state.outerTalkback(.Pull); } | Push(_) => () } }); sink(.Start((.signal) => { switch (signal) { | Pull => if (!state.ended) state.innerTalkback(.Pull) | Close => { state.innerTalkback(.Close); if (!state.ended) { state.ended = true; state.closed = true; state.outerTalkback(.Close); state.innerTalkback = talkbackPlaceholder; } } } })); })); let concatAll = source => concatMap((.x) => x, source); let concat = sources => { open Wonka_source_fromArray; concatMap((.x) => x, fromArray(sources)); };