Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v2.0.1 730 B view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type subscribeStateT = { 5 mutable talkback: (.talkbackT) => unit, 6 mutable ended: bool 7}; 8 9let subscribe = f => curry(source => { 10 let state: subscribeStateT = { 11 talkback: talkbackPlaceholder, 12 ended: false 13 }; 14 15 source((.signal) => { 16 switch (signal) { 17 | Start(x) => { 18 state.talkback = x; 19 x(.Pull); 20 } 21 | Push(x) when !state.ended => { 22 f(.x); 23 state.talkback(.Pull); 24 } 25 | Push(_) => () 26 | End => state.ended = true; 27 } 28 }); 29 30 { 31 unsubscribe: () => 32 if (!state.ended) { 33 state.ended = true; 34 state.talkback(.Close); 35 } 36 } 37}); 38 39let forEach = f => curry(source => { 40 ignore(subscribe(f, source)); 41});