Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 1.8 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type switchMapStateT('a) = { 5 mutable outerTalkback: (.talkbackT) => unit, 6 mutable innerTalkback: (.talkbackT) => unit, 7 mutable innerActive: bool, 8 mutable closed: bool, 9 mutable ended: bool 10}; 11 12let switchMap = f => curry(source => curry(sink => { 13 let state: switchMapStateT('a) = { 14 outerTalkback: talkbackPlaceholder, 15 innerTalkback: talkbackPlaceholder, 16 innerActive: false, 17 closed: false, 18 ended: false 19 }; 20 21 let applyInnerSource = innerSource => 22 innerSource((.signal) => { 23 switch (signal) { 24 | End => { 25 state.innerActive = false; 26 state.innerTalkback = talkbackPlaceholder; 27 if (state.ended) sink(.End); 28 } 29 | Start(tb) => { 30 state.innerActive = true; 31 state.innerTalkback = tb; 32 tb(.Pull); 33 } 34 | Push(x) when !state.closed => { 35 sink(.Push(x)); 36 state.innerTalkback(.Pull); 37 } 38 | Push(_) => () 39 } 40 }); 41 42 source((.signal) => { 43 switch (signal) { 44 | End when !state.ended => { 45 state.ended = true; 46 if (!state.innerActive) sink(.End); 47 } 48 | End => () 49 | Start(tb) => { 50 state.outerTalkback = tb; 51 tb(.Pull); 52 } 53 | Push(x) when !state.ended => { 54 if (state.innerActive) { 55 state.innerTalkback(.Close); 56 state.innerTalkback = talkbackPlaceholder; 57 } 58 applyInnerSource(f(.x)); 59 state.outerTalkback(.Pull); 60 } 61 | Push(_) => () 62 } 63 }); 64 65 sink(.Start((.signal) => { 66 switch (signal) { 67 | Pull => state.innerTalkback(.Pull) 68 | Close => { 69 state.innerTalkback(.Close); 70 if (!state.ended) { 71 state.ended = true; 72 state.closed = true; 73 state.outerTalkback(.Close); 74 state.innerTalkback = talkbackPlaceholder; 75 } 76 } 77 } 78 })); 79}));