Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 2.0 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type combineStateT('a, 'b) = { 5 mutable talkbackA: (.talkbackT) => unit, 6 mutable talkbackB: (.talkbackT) => unit, 7 mutable lastValA: option('a), 8 mutable lastValB: option('b), 9 mutable gotSignal: bool, 10 mutable endCounter: int, 11 mutable ended: bool, 12}; 13 14let combine = sourceA => curry(sourceB => curry(sink => { 15 let state = { 16 talkbackA: talkbackPlaceholder, 17 talkbackB: talkbackPlaceholder, 18 lastValA: None, 19 lastValB: None, 20 gotSignal: false, 21 endCounter: 0, 22 ended: false 23 }; 24 25 sourceA((.signal) => { 26 switch (signal, state.lastValB) { 27 | (Start(tb), _) => state.talkbackA = tb 28 | (Push(a), None) => { 29 state.lastValA = Some(a); 30 state.gotSignal = false; 31 } 32 | (Push(a), Some(b)) when !state.ended => { 33 state.lastValA = Some(a); 34 state.gotSignal = false; 35 sink(.Push((a, b))); 36 } 37 | (End, _) when state.endCounter < 1 => 38 state.endCounter = state.endCounter + 1 39 | (End, _) when !state.ended => { 40 state.ended = true; 41 sink(.End); 42 } 43 | _ => () 44 } 45 }); 46 47 sourceB((.signal) => { 48 switch (signal, state.lastValA) { 49 | (Start(tb), _) => state.talkbackB = tb 50 | (Push(b), None) => { 51 state.lastValB = Some(b); 52 state.gotSignal = false; 53 } 54 | (Push(b), Some(a)) when !state.ended => { 55 state.lastValB = Some(b); 56 state.gotSignal = false; 57 sink(.Push((a, b))); 58 } 59 | (End, _) when state.endCounter < 1 => 60 state.endCounter = state.endCounter + 1 61 | (End, _) when !state.ended => { 62 state.ended = true; 63 sink(.End); 64 } 65 | _ => () 66 } 67 }); 68 69 sink(.Start((.signal) => { 70 if (!state.ended) { 71 switch (signal) { 72 | Close => { 73 state.ended = true; 74 state.talkbackA(.Close); 75 state.talkbackB(.Close); 76 } 77 | Pull when !state.gotSignal => { 78 state.gotSignal = true; 79 state.talkbackA(.signal); 80 state.talkbackB(.signal); 81 } 82 | Pull => () 83 } 84 }; 85 })); 86}));