Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1open Wonka_types; 2open Wonka_helpers; 3 4type takeStateT = { 5 mutable taken: int, 6 mutable talkback: (. talkbackT) => unit, 7}; 8 9let take = max => 10 curry(source => 11 curry(sink => { 12 let state: takeStateT = {taken: 0, talkback: talkbackPlaceholder}; 13 14 source((. signal) => 15 switch (signal) { 16 | Start(tb) => state.talkback = tb 17 | Push(_) when state.taken < max => 18 state.taken = state.taken + 1; 19 sink(. signal); 20 21 if (state.taken === max) { 22 sink(. End); 23 state.talkback(. Close); 24 }; 25 | Push(_) => () 26 | End when state.taken < max => 27 state.taken = max; 28 sink(. End); 29 | End => () 30 } 31 ); 32 33 sink(. 34 Start( 35 (. signal) => 36 if (state.taken < max) { 37 switch (signal) { 38 | Pull => state.talkback(. Pull) 39 | Close => 40 state.taken = max; 41 state.talkback(. Close); 42 }; 43 }, 44 ), 45 ); 46 }) 47 );