Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.2 2.3 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type concatMapStateT('a) = { 5 inputQueue: Rebel.MutableQueue.t('a), 6 mutable outerTalkback: (.talkbackT) => unit, 7 mutable innerTalkback: (.talkbackT) => unit, 8 mutable innerActive: bool, 9 mutable closed: bool, 10 mutable ended: bool 11}; 12 13let concatMap = f => curry(source => curry(sink => { 14 let state: concatMapStateT('a) = { 15 inputQueue: Rebel.MutableQueue.make(), 16 outerTalkback: talkbackPlaceholder, 17 innerTalkback: talkbackPlaceholder, 18 innerActive: false, 19 closed: false, 20 ended: false 21 }; 22 23 let rec applyInnerSource = innerSource => 24 innerSource((.signal) => { 25 switch (signal) { 26 | End => { 27 state.innerActive = false; 28 state.innerTalkback = talkbackPlaceholder; 29 30 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 31 | Some(input) => applyInnerSource(f(.input)) 32 | None when state.ended => sink(.End) 33 | None => () 34 }; 35 } 36 | Start(tb) => { 37 state.innerActive = true; 38 state.innerTalkback = tb; 39 tb(.Pull); 40 } 41 | Push(x) when !state.closed => { 42 sink(.Push(x)); 43 state.innerTalkback(.Pull); 44 } 45 | Push(_) => () 46 } 47 }); 48 49 source((.signal) => { 50 switch (signal) { 51 | End when !state.ended => { 52 state.ended = true; 53 if (!state.innerActive && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 54 sink(.End); 55 } 56 } 57 | End => () 58 | Start(tb) => { 59 state.outerTalkback = tb; 60 tb(.Pull); 61 } 62 | Push(x) when !state.ended => { 63 if (state.innerActive) { 64 Rebel.MutableQueue.add(state.inputQueue, x); 65 } else { 66 applyInnerSource(f(.x)); 67 } 68 69 state.outerTalkback(.Pull); 70 } 71 | Push(_) => () 72 } 73 }); 74 75 sink(.Start((.signal) => { 76 switch (signal) { 77 | Pull => if (!state.ended) state.innerTalkback(.Pull) 78 | Close => { 79 state.innerTalkback(.Close); 80 if (!state.ended) { 81 state.ended = true; 82 state.closed = true; 83 state.outerTalkback(.Close); 84 state.innerTalkback = talkbackPlaceholder; 85 } 86 } 87 } 88 })); 89})); 90 91let concatAll = source => concatMap((.x) => x, source); 92 93let concat = sources => { 94 open Wonka_source_fromArray; 95 concatMap((.x) => x, fromArray(sources)); 96};