Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.3 1.2 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type shareStateT('a) = { 5 mutable sinks: Rebel.Array.t(sinkT('a)), 6 mutable talkback: (.talkbackT) => unit, 7 mutable gotSignal: bool 8}; 9 10let share = source => { 11 let state = { 12 sinks: Rebel.Array.makeEmpty(), 13 talkback: talkbackPlaceholder, 14 gotSignal: false 15 }; 16 17 sink => { 18 state.sinks = Rebel.Array.append(state.sinks, sink); 19 20 if (Rebel.Array.size(state.sinks) === 1) { 21 source((.signal) => { 22 switch (signal) { 23 | Push(_) => { 24 state.gotSignal = false; 25 Rebel.Array.forEach(state.sinks, sink => sink(.signal)); 26 } 27 | Start(x) => state.talkback = x 28 | End => { 29 Rebel.Array.forEach(state.sinks, sink => sink(.End)); 30 state.sinks = Rebel.Array.makeEmpty(); 31 } 32 } 33 }); 34 }; 35 36 sink(.Start((.signal) => { 37 switch (signal) { 38 | Close => { 39 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 40 if (Rebel.Array.size(state.sinks) === 0) { 41 state.talkback(.Close); 42 }; 43 } 44 | Pull when !state.gotSignal => { 45 state.gotSignal = true; 46 state.talkback(.signal); 47 } 48 | Pull => () 49 } 50 })); 51 } 52};