Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.2 1.9 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type skipUntilStateT = { 5 mutable skip: bool, 6 mutable ended: bool, 7 mutable gotSignal: bool, 8 mutable sourceTalkback: (. talkbackT) => unit, 9 mutable notifierTalkback: (. talkbackT) => unit, 10}; 11 12let skipUntil = notifier => 13 curry(source => 14 curry(sink => { 15 let state: skipUntilStateT = { 16 skip: true, 17 ended: false, 18 gotSignal: false, 19 sourceTalkback: talkbackPlaceholder, 20 notifierTalkback: talkbackPlaceholder, 21 }; 22 23 source((. signal) => 24 switch (signal) { 25 | Start(tb) => 26 state.sourceTalkback = tb; 27 28 notifier((. signal) => 29 switch (signal) { 30 | Start(innerTb) => 31 state.notifierTalkback = innerTb; 32 innerTb(. Pull); 33 tb(. Pull); 34 | Push(_) => 35 state.skip = false; 36 state.notifierTalkback(. Close); 37 | End => () 38 } 39 ); 40 | Push(_) when state.skip && !state.ended => 41 state.sourceTalkback(. Pull) 42 | Push(_) when !state.ended => 43 state.gotSignal = false; 44 sink(. signal); 45 | Push(_) => () 46 | End => 47 if (state.skip) { 48 state.notifierTalkback(. Close); 49 }; 50 state.ended = true; 51 sink(. End); 52 } 53 ); 54 55 sink(. 56 Start( 57 (. signal) => 58 switch (signal) { 59 | Close => 60 if (state.skip) { 61 state.notifierTalkback(. Close); 62 }; 63 state.ended = true; 64 state.sourceTalkback(. Close); 65 | Pull when !state.gotSignal && !state.ended => 66 state.gotSignal = true; 67 state.sourceTalkback(. Pull); 68 | Pull => () 69 }, 70 ), 71 ); 72 }) 73 );