Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 1.3 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type takeUntilStateT = { 5 mutable ended: bool, 6 mutable sourceTalkback: (.talkbackT) => unit, 7 mutable notifierTalkback: (.talkbackT) => unit 8}; 9 10let takeUntil = notifier => curry(source => curry(sink => { 11 let state: takeUntilStateT = { 12 ended: false, 13 sourceTalkback: talkbackPlaceholder, 14 notifierTalkback: talkbackPlaceholder 15 }; 16 17 source((.signal) => { 18 switch (signal) { 19 | Start(tb) => { 20 state.sourceTalkback = tb; 21 22 notifier((.signal) => { 23 switch (signal) { 24 | Start(innerTb) => { 25 state.notifierTalkback = innerTb; 26 innerTb(.Pull); 27 } 28 | Push(_) => { 29 state.ended = true; 30 state.notifierTalkback(.Close); 31 state.sourceTalkback(.Close); 32 sink(.End); 33 } 34 | End => () 35 } 36 }); 37 } 38 | End when !state.ended => { 39 state.notifierTalkback(.Close); 40 state.ended = true; 41 sink(.End); 42 } 43 | End => () 44 | Push(_) when !state.ended => sink(.signal) 45 | Push(_) => () 46 } 47 }); 48 49 sink(.Start((.signal) => { 50 if (!state.ended) { 51 switch (signal) { 52 | Close => { 53 state.sourceTalkback(.Close); 54 state.notifierTalkback(.Close); 55 } 56 | Pull => state.sourceTalkback(.Pull) 57 } 58 }; 59 })); 60}));