open Wonka_types; open Wonka_helpers; type skipUntilStateT = { mutable skip: bool, mutable ended: bool, mutable gotSignal: bool, mutable sourceTalkback: (. talkbackT) => unit, mutable notifierTalkback: (. talkbackT) => unit, }; let skipUntil = notifier => curry(source => curry(sink => { let state: skipUntilStateT = { skip: true, ended: false, gotSignal: false, sourceTalkback: talkbackPlaceholder, notifierTalkback: talkbackPlaceholder, }; source((. signal) => switch (signal) { | Start(tb) => state.sourceTalkback = tb; notifier((. signal) => switch (signal) { | Start(innerTb) => state.notifierTalkback = innerTb; innerTb(. Pull); tb(. Pull); | Push(_) => state.skip = false; state.notifierTalkback(. Close); | End => () } ); | Push(_) when state.skip && !state.ended => state.sourceTalkback(. Pull) | Push(_) when !state.ended => state.gotSignal = false; sink(. signal); | Push(_) => () | End => if (state.skip) { state.notifierTalkback(. Close); }; state.ended = true; sink(. End); } ); sink(. Start( (. signal) => switch (signal) { | Close => if (state.skip) { state.notifierTalkback(. Close); }; state.ended = true; state.sourceTalkback(. Close); | Pull when !state.gotSignal && !state.ended => state.gotSignal = true; state.sourceTalkback(. Pull); | Pull => () }, ), ); }) );