open Wonka_types; type subjectState('a) = { mutable sinks: Rebel.Array.t(sinkT('a)), mutable ended: bool, }; let makeSubject = () => { let state: subjectState('a) = { sinks: Rebel.Array.makeEmpty(), ended: false, }; let source = sink => { state.sinks = Rebel.Array.append(state.sinks, sink); sink(. Start( (. signal) => if (signal === Close) { state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); }, ), ); }; let next = value => if (!state.ended) { Rebel.Array.forEach(state.sinks, sink => sink(. Push(value))); }; let complete = () => if (!state.ended) { state.ended = true; Rebel.Array.forEach(state.sinks, sink => sink(. End)); }; {source, next, complete}; };