Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 1.6 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type skipUntilStateT = { 5 mutable skip: bool, 6 mutable ended: bool, 7 mutable gotSignal: bool, 8 mutable sourceTalkback: (.talkbackT) => unit, 9 mutable notifierTalkback: (.talkbackT) => unit 10}; 11 12let skipUntil = notifier => curry(source => curry(sink => { 13 let state: skipUntilStateT = { 14 skip: true, 15 ended: false, 16 gotSignal: false, 17 sourceTalkback: talkbackPlaceholder, 18 notifierTalkback: talkbackPlaceholder 19 }; 20 21 source((.signal) => { 22 switch (signal) { 23 | Start(tb) => { 24 state.sourceTalkback = tb; 25 26 notifier((.signal) => { 27 switch (signal) { 28 | Start(innerTb) => { 29 state.notifierTalkback = innerTb; 30 innerTb(.Pull); 31 tb(.Pull); 32 } 33 | Push(_) => { 34 state.skip = false; 35 state.notifierTalkback(.Close); 36 } 37 | End => () 38 } 39 }); 40 } 41 | Push(_) when state.skip && !state.ended => state.sourceTalkback(.Pull) 42 | Push(_) when !state.ended => { 43 state.gotSignal = false; 44 sink(.signal) 45 } 46 | Push(_) => () 47 | End => { 48 if (state.skip) state.notifierTalkback(.Close); 49 state.ended = true; 50 sink(.End) 51 } 52 } 53 }); 54 55 sink(.Start((.signal) => { 56 switch (signal) { 57 | Close => { 58 if (state.skip) state.notifierTalkback(.Close); 59 state.ended = true; 60 state.sourceTalkback(.Close); 61 } 62 | Pull when !state.gotSignal && !state.ended => { 63 state.gotSignal = true; 64 state.sourceTalkback(.Pull); 65 } 66 | Pull => () 67 } 68 })); 69}));