open Wonka_types; open Wonka_helpers; type switchMapStateT('a) = { mutable outerTalkback: (.talkbackT) => unit, mutable innerTalkback: (.talkbackT) => unit, mutable innerActive: bool, mutable closed: bool, mutable ended: bool }; let switchMap = f => curry(source => curry(sink => { let state: switchMapStateT('a) = { outerTalkback: talkbackPlaceholder, innerTalkback: talkbackPlaceholder, innerActive: false, closed: false, ended: false }; let applyInnerSource = innerSource => innerSource((.signal) => { switch (signal) { | End => { state.innerActive = false; state.innerTalkback = talkbackPlaceholder; if (state.ended) sink(.End); } | Start(tb) => { state.innerActive = true; state.innerTalkback = tb; tb(.Pull); } | Push(x) when !state.closed => { sink(.Push(x)); state.innerTalkback(.Pull); } | Push(_) => () } }); source((.signal) => { switch (signal) { | End when !state.ended => { state.ended = true; if (!state.innerActive) sink(.End); } | End => () | Start(tb) => { state.outerTalkback = tb; tb(.Pull); } | Push(x) when !state.ended => { if (state.innerActive) { state.innerTalkback(.Close); state.innerTalkback = talkbackPlaceholder; } applyInnerSource(f(.x)); state.outerTalkback(.Pull); } | Push(_) => () } }); sink(.Start((.signal) => { switch (signal) { | Pull => state.innerTalkback(.Pull) | Close => { state.innerTalkback(.Close); if (!state.ended) { state.ended = true; state.closed = true; state.outerTalkback(.Close); state.innerTalkback = talkbackPlaceholder; } } } })); })); let switchAll = source => switchMap((.x) => x, source);