Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 767 B view raw
1open Wonka_types; 2 3type subjectState('a) = { 4 mutable sinks: Rebel.Array.t(sinkT('a)), 5 mutable ended: bool 6}; 7 8let makeSubject = () => { 9 let state: subjectState('a) = { 10 sinks: Rebel.Array.makeEmpty(), 11 ended: false, 12 }; 13 14 let source = sink => { 15 state.sinks = Rebel.Array.append(state.sinks, sink); 16 sink(.Start((.signal) => { 17 if (signal === Close) { 18 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 19 } 20 })); 21 }; 22 23 let next = value => 24 if (!state.ended) { 25 Rebel.Array.forEach(state.sinks, sink => sink(.Push(value))); 26 }; 27 28 let complete = () => 29 if (!state.ended) { 30 state.ended = true; 31 Rebel.Array.forEach(state.sinks, sink => sink(.End)); 32 }; 33 34 { source, next, complete } 35};