Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.1 2.2 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type switchMapStateT('a) = { 5 mutable outerTalkback: (. talkbackT) => unit, 6 mutable innerTalkback: (. talkbackT) => unit, 7 mutable innerActive: bool, 8 mutable closed: bool, 9 mutable ended: bool, 10}; 11 12let switchMap = f => 13 curry(source => 14 curry(sink => { 15 let state: switchMapStateT('a) = { 16 outerTalkback: talkbackPlaceholder, 17 innerTalkback: talkbackPlaceholder, 18 innerActive: false, 19 closed: false, 20 ended: false, 21 }; 22 23 let applyInnerSource = innerSource => 24 innerSource((. signal) => 25 switch (signal) { 26 | End => 27 state.innerActive = false; 28 state.innerTalkback = talkbackPlaceholder; 29 if (state.ended) { 30 sink(. End); 31 }; 32 | Start(tb) => 33 state.innerActive = true; 34 state.innerTalkback = tb; 35 tb(. Pull); 36 | Push(x) when !state.closed => 37 sink(. Push(x)); 38 state.innerTalkback(. Pull); 39 | Push(_) => () 40 } 41 ); 42 43 source((. signal) => 44 switch (signal) { 45 | End when !state.ended => 46 state.ended = true; 47 if (!state.innerActive) { 48 sink(. End); 49 }; 50 | End => () 51 | Start(tb) => 52 state.outerTalkback = tb; 53 tb(. Pull); 54 | Push(x) when !state.ended => 55 if (state.innerActive) { 56 state.innerTalkback(. Close); 57 state.innerTalkback = talkbackPlaceholder; 58 }; 59 applyInnerSource(f(. x)); 60 state.outerTalkback(. Pull); 61 | Push(_) => () 62 } 63 ); 64 65 sink(. 66 Start( 67 (. signal) => 68 switch (signal) { 69 | Pull => state.innerTalkback(. Pull) 70 | Close => 71 state.innerTalkback(. Close); 72 if (!state.ended) { 73 state.ended = true; 74 state.closed = true; 75 state.outerTalkback(. Close); 76 state.innerTalkback = talkbackPlaceholder; 77 }; 78 }, 79 ), 80 ); 81 }) 82 ); 83 84let switchAll = source => switchMap((. x) => x, source);