Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v3.2.2 3.5 kB view raw
1open Wonka_types; 2 3let testWithListenable = operator => { 4 let sink = ref((. _: signalT(int)) => ()); 5 let signals = [||]; 6 let source = x => { 7 sink := x; 8 x(. Start((. signal) => ignore(Js.Array.push(signal, signals)))); 9 }; 10 11 let talkback = ref((. _: talkbackT) => ()); 12 let res = [||]; 13 operator(source, (. signal) => 14 switch (signal) { 15 | Start(x) => talkback := x 16 | _ => ignore(Js.Array.push(signal, res)) 17 } 18 ); 19 20 Js.Promise.make((~resolve, ~reject as _) => { 21 sink^(. Push(1)); 22 ignore( 23 Js.Global.setTimeout( 24 () => { 25 sink^(. Push(2)); 26 ignore( 27 Js.Global.setTimeout( 28 () => { 29 sink^(. End); 30 ignore( 31 Js.Global.setTimeout(() => resolve(. (signals, res)), 0), 32 ); 33 }, 34 0, 35 ), 36 ); 37 }, 38 0, 39 ), 40 ); 41 }); 42}; 43 44let testTalkbackEnd = operator => { 45 let sink = ref((. _: signalT(int)) => ()); 46 let signals: array(talkbackT) = [||]; 47 let source = x => { 48 x(. Start((. signal) => ignore(Js.Array.push(signal, signals)))); 49 sink := x; 50 }; 51 52 let talkback = ref((. _: talkbackT) => ()); 53 let res = [||]; 54 operator(source, (. signal) => 55 switch (signal) { 56 | Start(x) => talkback := x 57 | _ => ignore(Js.Array.push(signal, res)) 58 } 59 ); 60 61 Js.Promise.make((~resolve, ~reject as _) => { 62 sink^(. Push(1)); 63 ignore( 64 Js.Global.setTimeout( 65 () => { 66 talkback^(. Close); 67 ignore(Js.Global.setTimeout(() => resolve(. (signals, res)), 0)); 68 }, 69 0, 70 ), 71 ); 72 }); 73}; 74 75let testSource = source => { 76 let talkback = ref((. _: talkbackT) => ()); 77 let res = [||]; 78 79 Js.Promise.make((~resolve, ~reject as _) => 80 source((. signal) => 81 switch (signal) { 82 | Start(x) => 83 talkback := x; 84 talkback^(. Pull); 85 | Push(_) => 86 ignore(Js.Array.push(signal, res)); 87 talkback^(. Pull); 88 | End => 89 ignore(Js.Array.push(signal, res)); 90 resolve(. res); 91 } 92 ) 93 ); 94}; 95 96let testSourceOperator = source => { 97 let res = [||]; 98 let innerSource = sink => { 99 source((. signal) => 100 switch (signal) { 101 | Start(outerTalkback) => 102 sink(. 103 Start( 104 (. talkback) => { 105 Js.Array.push(talkback, res); 106 outerTalkback(. talkback); 107 }, 108 ), 109 ) 110 | _ => sink(. signal) 111 } 112 ); 113 }; 114 115 (res, innerSource); 116}; 117 118type observableClassT; 119 120[@bs.module] external observableClass: observableClassT = "zen-observable"; 121[@bs.send] 122external _observableFromArray: 123 (observableClassT, array('a)) => Wonka.observableT('a) = 124 "from"; 125[@bs.send] 126external _observableFrom: 127 (observableClassT, Wonka.observableT('a)) => Wonka.observableT('a) = 128 "from"; 129[@bs.send] 130external observableForEach: 131 (Wonka.observableT('a), 'a => unit) => Js.Promise.t(unit) = 132 "forEach"; 133 134let observableFromArray = (arr: array('a)): Wonka.observableT('a) => 135 _observableFromArray(observableClass, arr); 136let observableFrom = (obs: Wonka.observableT('a)): Wonka.observableT('a) => 137 _observableFrom(observableClass, obs); 138 139[@bs.module] 140external callbagFromArray: array('a) => Wonka.callbagT('a) = 141 "callbag-from-iter"; 142 143[@bs.module] 144external callbagFromObservable: Wonka.observableT('a) => Wonka.callbagT('a) = 145 "callbag-from-obs"; 146 147[@bs.module] 148external callbagIterate: (. ('a => unit)) => (. Wonka.callbagT('a)) => unit = 149 "callbag-iterate";