Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.2 2.3 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type combineStateT('a, 'b) = { 5 mutable talkbackA: (. talkbackT) => unit, 6 mutable talkbackB: (. talkbackT) => unit, 7 mutable lastValA: option('a), 8 mutable lastValB: option('b), 9 mutable gotSignal: bool, 10 mutable endCounter: int, 11 mutable ended: bool, 12}; 13 14let combine = sourceA => 15 curry(sourceB => 16 curry(sink => { 17 let state = { 18 talkbackA: talkbackPlaceholder, 19 talkbackB: talkbackPlaceholder, 20 lastValA: None, 21 lastValB: None, 22 gotSignal: false, 23 endCounter: 0, 24 ended: false, 25 }; 26 27 sourceA((. signal) => 28 switch (signal, state.lastValB) { 29 | (Start(tb), _) => state.talkbackA = tb 30 | (Push(a), None) => 31 state.lastValA = Some(a); 32 state.gotSignal = false; 33 | (Push(a), Some(b)) when !state.ended => 34 state.lastValA = Some(a); 35 state.gotSignal = false; 36 sink(. Push((a, b))); 37 | (End, _) when state.endCounter < 1 => 38 state.endCounter = state.endCounter + 1 39 | (End, _) when !state.ended => 40 state.ended = true; 41 sink(. End); 42 | _ => () 43 } 44 ); 45 46 sourceB((. signal) => 47 switch (signal, state.lastValA) { 48 | (Start(tb), _) => state.talkbackB = tb 49 | (Push(b), None) => 50 state.lastValB = Some(b); 51 state.gotSignal = false; 52 | (Push(b), Some(a)) when !state.ended => 53 state.lastValB = Some(b); 54 state.gotSignal = false; 55 sink(. Push((a, b))); 56 | (End, _) when state.endCounter < 1 => 57 state.endCounter = state.endCounter + 1 58 | (End, _) when !state.ended => 59 state.ended = true; 60 sink(. End); 61 | _ => () 62 } 63 ); 64 65 sink(. 66 Start( 67 (. signal) => 68 if (!state.ended) { 69 switch (signal) { 70 | Close => 71 state.ended = true; 72 state.talkbackA(. Close); 73 state.talkbackB(. Close); 74 | Pull when !state.gotSignal => 75 state.gotSignal = true; 76 state.talkbackA(. signal); 77 state.talkbackB(. signal); 78 | Pull => () 79 }; 80 }, 81 ), 82 ); 83 }) 84 );