Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 2.2 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type mergeMapStateT = { 5 mutable outerTalkback: (.talkbackT) => unit, 6 mutable innerTalkbacks: Rebel.Array.t((.talkbackT) => unit), 7 mutable ended: bool 8}; 9 10let mergeMap = f => curry(source => curry(sink => { 11 let state: mergeMapStateT = { 12 outerTalkback: talkbackPlaceholder, 13 innerTalkbacks: Rebel.Array.makeEmpty(), 14 ended: false 15 }; 16 17 let applyInnerSource = innerSource => { 18 let talkback = ref(talkbackPlaceholder); 19 20 innerSource((.signal) => { 21 switch (signal) { 22 | End => { 23 state.innerTalkbacks = Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^); 24 if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) { 25 sink(.End); 26 } 27 } 28 | Start(tb) => { 29 talkback := tb; 30 state.innerTalkbacks = Rebel.Array.append(state.innerTalkbacks, tb); 31 tb(.Pull); 32 } 33 | Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => { 34 sink(.Push(x)); 35 talkback^(.Pull); 36 } 37 | Push(_) => () 38 } 39 }); 40 }; 41 42 source((.signal) => { 43 switch (signal) { 44 | End when !state.ended => { 45 state.ended = true; 46 if (Rebel.Array.size(state.innerTalkbacks) === 0) { 47 sink(.End); 48 } 49 } 50 | End => () 51 | Start(tb) => { 52 state.outerTalkback = tb; 53 tb(.Pull); 54 } 55 | Push(x) when !state.ended => { 56 applyInnerSource(f(.x)); 57 state.outerTalkback(.Pull); 58 } 59 | Push(_) => () 60 } 61 }); 62 63 sink(.Start((.signal) => { 64 switch (signal) { 65 | Close => { 66 Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close)); 67 if (!state.ended) { 68 state.ended = true; 69 state.outerTalkback(.Close); 70 Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close)); 71 state.innerTalkbacks = Rebel.Array.makeEmpty(); 72 } 73 } 74 | Pull when !state.ended => 75 Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Pull)); 76 | Pull => () 77 } 78 })); 79})); 80 81let merge = sources => { 82 open Wonka_source_fromArray; 83 mergeMap((.x) => x, fromArray(sources)); 84}; 85 86let mergeAll = source => mergeMap((.x) => x, source); 87let flatten = mergeAll;