open Wonka_types; open Wonka_helpers; type mergeMapStateT = { mutable outerTalkback: (.talkbackT) => unit, mutable innerTalkbacks: Rebel.Array.t((.talkbackT) => unit), mutable ended: bool }; let mergeMap = f => curry(source => curry(sink => { let state: mergeMapStateT = { outerTalkback: talkbackPlaceholder, innerTalkbacks: Rebel.Array.makeEmpty(), ended: false }; let applyInnerSource = innerSource => { let talkback = ref(talkbackPlaceholder); innerSource((.signal) => { switch (signal) { | End => { state.innerTalkbacks = Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { sink(.End); } } | Start(tb) => { talkback := tb; state.innerTalkbacks = Rebel.Array.append(state.innerTalkbacks, tb); tb(.Pull); } | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => { sink(.Push(x)); talkback^(.Pull); } | Push(_) => () } }); }; source((.signal) => { switch (signal) { | End when !state.ended => { state.ended = true; if (Rebel.Array.size(state.innerTalkbacks) === 0) { sink(.End); } } | End => () | Start(tb) => { state.outerTalkback = tb; tb(.Pull); } | Push(x) when !state.ended => { applyInnerSource(f(.x)); state.outerTalkback(.Pull); } | Push(_) => () } }); sink(.Start((.signal) => { switch (signal) { | Close => { Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close)); if (!state.ended) { state.ended = true; state.outerTalkback(.Close); Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close)); state.innerTalkbacks = Rebel.Array.makeEmpty(); } } | Pull when !state.ended => Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Pull)); | Pull => () } })); })); let merge = sources => { open Wonka_source_fromArray; mergeMap((.x) => x, fromArray(sources)); }; let mergeAll = source => mergeMap((.x) => x, source); let flatten = mergeAll;