Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 914 B view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type takeStateT = { 5 mutable taken: int, 6 mutable talkback: (.talkbackT) => unit 7}; 8 9let take = max => curry(source => curry(sink => { 10 let state: takeStateT = { 11 taken: 0, 12 talkback: talkbackPlaceholder 13 }; 14 15 source((.signal) => { 16 switch (signal) { 17 | Start(tb) => state.talkback = tb; 18 | Push(_) when state.taken < max => { 19 state.taken = state.taken + 1; 20 sink(.signal); 21 22 if (state.taken === max) { 23 sink(.End); 24 state.talkback(.Close); 25 }; 26 } 27 | Push(_) => () 28 | End when state.taken < max => { 29 state.taken = max; 30 sink(.End) 31 } 32 | End => () 33 } 34 }); 35 36 sink(.Start((.signal) => { 37 if (state.taken < max) { 38 switch (signal) { 39 | Pull => state.talkback(.Pull); 40 | Close => { 41 state.taken = max; 42 state.talkback(.Close); 43 } 44 } 45 }; 46 })); 47}));