Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.2 1.3 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type shareStateT('a) = { 5 mutable sinks: Rebel.Array.t(sinkT('a)), 6 mutable talkback: (. talkbackT) => unit, 7 mutable gotSignal: bool, 8}; 9 10let share = source => { 11 let state = { 12 sinks: Rebel.Array.makeEmpty(), 13 talkback: talkbackPlaceholder, 14 gotSignal: false, 15 }; 16 17 sink => { 18 state.sinks = Rebel.Array.append(state.sinks, sink); 19 20 if (Rebel.Array.size(state.sinks) === 1) { 21 source((. signal) => 22 switch (signal) { 23 | Push(_) => 24 state.gotSignal = false; 25 Rebel.Array.forEach(state.sinks, sink => sink(. signal)); 26 | Start(x) => state.talkback = x 27 | End => 28 Rebel.Array.forEach(state.sinks, sink => sink(. End)); 29 state.sinks = Rebel.Array.makeEmpty(); 30 } 31 ); 32 }; 33 34 sink(. 35 Start( 36 (. signal) => 37 switch (signal) { 38 | Close => 39 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink); 40 if (Rebel.Array.size(state.sinks) === 0) { 41 state.talkback(. Close); 42 }; 43 | Pull when !state.gotSignal => 44 state.gotSignal = true; 45 state.talkback(. signal); 46 | Pull => () 47 }, 48 ), 49 ); 50 }; 51};