Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.1 2.6 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type mergeMapStateT = { 5 mutable outerTalkback: (. talkbackT) => unit, 6 mutable innerTalkbacks: Rebel.Array.t((. talkbackT) => unit), 7 mutable ended: bool, 8}; 9 10let mergeMap = f => 11 curry(source => 12 curry(sink => { 13 let state: mergeMapStateT = { 14 outerTalkback: talkbackPlaceholder, 15 innerTalkbacks: Rebel.Array.makeEmpty(), 16 ended: false, 17 }; 18 19 let applyInnerSource = innerSource => { 20 let talkback = ref(talkbackPlaceholder); 21 22 innerSource((. signal) => 23 switch (signal) { 24 | End => 25 state.innerTalkbacks = 26 Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 27 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { 28 sink(. End); 29 }; 30 | Start(tb) => 31 talkback := tb; 32 state.innerTalkbacks = 33 Rebel.Array.append(state.innerTalkbacks, tb); 34 tb(. Pull); 35 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => 36 sink(. Push(x)); 37 talkback^(. Pull); 38 | Push(_) => () 39 } 40 ); 41 }; 42 43 source((. signal) => 44 switch (signal) { 45 | End when !state.ended => 46 state.ended = true; 47 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 48 sink(. End); 49 }; 50 | End => () 51 | Start(tb) => 52 state.outerTalkback = tb; 53 tb(. Pull); 54 | Push(x) when !state.ended => 55 applyInnerSource(f(. x)); 56 state.outerTalkback(. Pull); 57 | Push(_) => () 58 } 59 ); 60 61 sink(. 62 Start( 63 (. signal) => 64 switch (signal) { 65 | Close => 66 Rebel.Array.forEach(state.innerTalkbacks, talkback => 67 talkback(. Close) 68 ); 69 if (!state.ended) { 70 state.ended = true; 71 state.outerTalkback(. Close); 72 Rebel.Array.forEach(state.innerTalkbacks, talkback => 73 talkback(. Close) 74 ); 75 state.innerTalkbacks = Rebel.Array.makeEmpty(); 76 }; 77 | Pull when !state.ended => 78 Rebel.Array.forEach(state.innerTalkbacks, talkback => 79 talkback(. Pull) 80 ) 81 | Pull => () 82 }, 83 ), 84 ); 85 }) 86 ); 87 88let merge = sources => { 89 Wonka_source_fromArray.(mergeMap((. x) => x, fromArray(sources))); 90}; 91 92let mergeAll = source => mergeMap((. x) => x, source); 93let flatten = mergeAll;