Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.4 1.3 kB view raw
1open Wonka_types; 2 3type sampleStateT('a) = { 4 mutable ended: bool, 5 mutable value: option('a), 6 mutable sourceTalkback: (.talkbackT) => unit, 7 mutable notifierTalkback: (.talkbackT) => unit 8}; 9 10let sample = notifier => curry(source => curry(sink => { 11 let state = { 12 ended: false, 13 value: None, 14 sourceTalkback: (._: talkbackT) => (), 15 notifierTalkback: (._: talkbackT) => () 16 }; 17 18 source((.signal) => { 19 switch (signal) { 20 | Start(tb) => state.sourceTalkback = tb 21 | End => { 22 state.ended = true; 23 state.notifierTalkback(.Close); 24 sink(.End); 25 } 26 | Push(x) => state.value = Some(x) 27 } 28 }); 29 30 notifier((.signal) => { 31 switch (signal, state.value) { 32 | (Start(tb), _) => state.notifierTalkback = tb 33 | (End, _) => { 34 state.ended = true; 35 state.sourceTalkback(.Close); 36 sink(.End); 37 } 38 | (Push(_), Some(x)) when !state.ended => { 39 state.value = None; 40 sink(.Push(x)); 41 } 42 | (Push(_), _) => () 43 } 44 }); 45 46 sink(.Start((.signal) => { 47 switch (signal) { 48 | Pull => { 49 state.sourceTalkback(.Pull); 50 state.notifierTalkback(.Pull); 51 } 52 | Close => { 53 state.ended = true; 54 state.sourceTalkback(.Close); 55 state.notifierTalkback(.Close); 56 } 57 } 58 })); 59}));