Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.2 1.8 kB view raw
1open Wonka_types; 2open Wonka_helpers; 3 4type subscribeStateT = { 5 mutable talkback: (. talkbackT) => unit, 6 mutable ended: bool, 7}; 8 9[@genType] 10type subscribeConsumerT('a) = sourceT('a) => subscriptionT; 11 12[@genType] 13let subscribe = (f: (. 'a) => unit): subscribeConsumerT('a) => 14 curry(source => { 15 let state: subscribeStateT = { 16 talkback: talkbackPlaceholder, 17 ended: false, 18 }; 19 20 source((. signal) => 21 switch (signal) { 22 | Start(x) => 23 state.talkback = x; 24 x(. Pull); 25 | Push(x) when !state.ended => 26 f(. x); 27 state.talkback(. Pull); 28 | Push(_) => () 29 | End => state.ended = true 30 } 31 ); 32 33 { 34 unsubscribe: () => 35 if (!state.ended) { 36 state.ended = true; 37 state.talkback(. Close); 38 }, 39 }; 40 }); 41 42[@genType] 43type forEachConsumerT('a) = sourceT('a) => unit; 44 45[@genType] 46let forEach = (f: (. 'a) => unit): forEachConsumerT('a) => 47 curry(source => ignore(subscribe(f, source))); 48 49[@genType] 50let publish = (source: sourceT('a)): subscriptionT => 51 subscribe((. _) => (), source); 52 53type toArrayStateT('a) = { 54 values: Rebel.MutableQueue.t('a), 55 mutable talkback: (. talkbackT) => unit, 56 mutable value: option('a), 57 mutable ended: bool, 58}; 59 60[@genType] 61let toArray = (source: sourceT('a)): array('a) => { 62 let state: toArrayStateT('a) = { 63 values: Rebel.MutableQueue.make(), 64 talkback: talkbackPlaceholder, 65 value: None, 66 ended: false, 67 }; 68 69 source((. signal) => 70 switch (signal) { 71 | Start(x) => 72 state.talkback = x; 73 x(. Pull); 74 | Push(value) => 75 Rebel.MutableQueue.add(state.values, value); 76 state.talkback(. Pull); 77 | End => state.ended = true 78 } 79 ); 80 81 if (!state.ended) { 82 state.talkback(. Close); 83 }; 84 85 Rebel.MutableQueue.toArray(state.values); 86};