open Wonka_types; open Wonka_helpers; type shareStateT('a) = { mutable sinks: Rebel.Array.t(sinkT('a)), mutable talkback: (. talkbackT) => unit, mutable gotSignal: bool, }; let share = source => { let state = { sinks: Rebel.Array.makeEmpty(), talkback: talkbackPlaceholder, gotSignal: false, }; sink => { state.sinks = Rebel.Array.append(state.sinks, sink); if (Rebel.Array.size(state.sinks) === 1) { source((. signal) => switch (signal) { | Push(_) => state.gotSignal = false; Rebel.Array.forEach(state.sinks, sink => sink(. signal)); | Start(x) => state.talkback = x | End => Rebel.Array.forEach(state.sinks, sink => sink(. End)); state.sinks = Rebel.Array.makeEmpty(); } ); }; sink(. Start( (. signal) => switch (signal) { | Close => state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); if (Rebel.Array.size(state.sinks) === 0) { state.talkback(. Close); }; | Pull when !state.gotSignal => state.gotSignal = true; state.talkback(. signal); | Pull => () }, ), ); }; };