Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.1 2.7 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type concatMapStateT('a) = { 5 inputQueue: Rebel.MutableQueue.t('a), 6 mutable outerTalkback: (. talkbackT) => unit, 7 mutable innerTalkback: (. talkbackT) => unit, 8 mutable innerActive: bool, 9 mutable closed: bool, 10 mutable ended: bool, 11}; 12 13let concatMap = f => 14 curry(source => 15 curry(sink => { 16 let state: concatMapStateT('a) = { 17 inputQueue: Rebel.MutableQueue.make(), 18 outerTalkback: talkbackPlaceholder, 19 innerTalkback: talkbackPlaceholder, 20 innerActive: false, 21 closed: false, 22 ended: false, 23 }; 24 25 let rec applyInnerSource = innerSource => 26 innerSource((. signal) => 27 switch (signal) { 28 | End => 29 state.innerActive = false; 30 state.innerTalkback = talkbackPlaceholder; 31 32 switch (Rebel.MutableQueue.pop(state.inputQueue)) { 33 | Some(input) => applyInnerSource(f(. input)) 34 | None when state.ended => sink(. End) 35 | None => () 36 }; 37 | Start(tb) => 38 state.innerActive = true; 39 state.innerTalkback = tb; 40 tb(. Pull); 41 | Push(x) when !state.closed => 42 sink(. Push(x)); 43 state.innerTalkback(. Pull); 44 | Push(_) => () 45 } 46 ); 47 48 source((. signal) => 49 switch (signal) { 50 | End when !state.ended => 51 state.ended = true; 52 if (!state.innerActive 53 && Rebel.MutableQueue.isEmpty(state.inputQueue)) { 54 sink(. End); 55 }; 56 | End => () 57 | Start(tb) => 58 state.outerTalkback = tb; 59 tb(. Pull); 60 | Push(x) when !state.ended => 61 if (state.innerActive) { 62 Rebel.MutableQueue.add(state.inputQueue, x); 63 } else { 64 applyInnerSource(f(. x)); 65 }; 66 67 state.outerTalkback(. Pull); 68 | Push(_) => () 69 } 70 ); 71 72 sink(. 73 Start( 74 (. signal) => 75 switch (signal) { 76 | Pull => 77 if (!state.ended) { 78 state.innerTalkback(. Pull); 79 } 80 | Close => 81 state.innerTalkback(. Close); 82 if (!state.ended) { 83 state.ended = true; 84 state.closed = true; 85 state.outerTalkback(. Close); 86 state.innerTalkback = talkbackPlaceholder; 87 }; 88 }, 89 ), 90 ); 91 }) 92 ); 93 94let concatAll = source => concatMap((. x) => x, source); 95 96let concat = sources => { 97 Wonka_source_fromArray.(concatMap((. x) => x, fromArray(sources))); 98};