Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.2 804 B view raw
1open Wonka_types; 2 3type subjectState('a) = { 4 mutable sinks: Rebel.Array.t(sinkT('a)), 5 mutable ended: bool, 6}; 7 8let makeSubject = () => { 9 let state: subjectState('a) = { 10 sinks: Rebel.Array.makeEmpty(), 11 ended: false, 12 }; 13 14 let source = sink => { 15 state.sinks = Rebel.Array.append(state.sinks, sink); 16 sink(. 17 Start( 18 (. signal) => 19 if (signal === Close) { 20 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 21 }, 22 ), 23 ); 24 }; 25 26 let next = value => 27 if (!state.ended) { 28 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value))); 29 }; 30 31 let complete = () => 32 if (!state.ended) { 33 state.ended = true; 34 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 35 }; 36 37 {source, next, complete}; 38};