Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.3 5.8 kB view raw
1open Wonka_types; 2 3[@genType] 4let fromObservable = Wonka_observable.fromObservable; 5[@genType] 6let toObservable = Wonka_observable.toObservable; 7 8[@genType] 9let fromCallbag = Wonka_callbag.fromCallbag; 10[@genType] 11let toCallbag = Wonka_callbag.toCallbag; 12 13/* operators */ 14 15type debounceStateT = { 16 mutable id: option(Js.Global.timeoutId), 17 mutable deferredEnded: bool, 18 mutable ended: bool, 19}; 20 21[@genType] 22let debounce = (f: (. 'a) => int): operatorT('a, 'a) => 23 curry(source => 24 curry(sink => { 25 let state: debounceStateT = { 26 id: None, 27 deferredEnded: false, 28 ended: false, 29 }; 30 31 let clearTimeout = () => 32 switch (state.id) { 33 | Some(timeoutId) => 34 state.id = None; 35 Js.Global.clearTimeout(timeoutId); 36 | None => () 37 }; 38 39 source((. signal) => 40 switch (signal) { 41 | Start(tb) => 42 sink(. 43 Start( 44 (. signal) => 45 if (!state.ended) { 46 switch (signal) { 47 | Close => 48 state.ended = true; 49 state.deferredEnded = false; 50 clearTimeout(); 51 tb(. Close); 52 | Pull => tb(. Pull) 53 }; 54 }, 55 ), 56 ) 57 | Push(x) when !state.ended => 58 clearTimeout(); 59 state.id = 60 Some( 61 Js.Global.setTimeout( 62 () => { 63 state.id = None; 64 sink(. signal); 65 if (state.deferredEnded) { 66 sink(. End); 67 }; 68 }, 69 f(. x), 70 ), 71 ); 72 | Push(_) => () 73 | End when !state.ended => 74 state.ended = true; 75 switch (state.id) { 76 | Some(_) => state.deferredEnded = true 77 | None => sink(. End) 78 }; 79 | End => () 80 } 81 ); 82 }) 83 ); 84 85[@genType] 86let delay = (wait: int): operatorT('a, 'a) => 87 curry(source => 88 curry(sink => { 89 let active = ref(0); 90 91 source((. signal) => 92 switch (signal) { 93 | Start(_) => sink(. signal) 94 | _ => 95 active := active^ + 1; 96 ignore( 97 Js.Global.setTimeout( 98 () => 99 if (active^ !== 0) { 100 active := active^ - 1; 101 sink(. signal); 102 }, 103 wait, 104 ), 105 ); 106 } 107 ); 108 }) 109 ); 110 111[@genType] 112let throttle = (f: (. 'a) => int): operatorT('a, 'a) => 113 curry(source => 114 curry(sink => { 115 let skip = ref(false); 116 let id: ref(option(Js.Global.timeoutId)) = ref(None); 117 let clearTimeout = () => 118 switch (id^) { 119 | Some(timeoutId) => Js.Global.clearTimeout(timeoutId) 120 | None => () 121 }; 122 123 source((. signal) => 124 switch (signal) { 125 | Start(tb) => 126 sink(. 127 Start( 128 (. signal) => 129 switch (signal) { 130 | Close => 131 clearTimeout(); 132 tb(. Close); 133 | _ => tb(. signal) 134 }, 135 ), 136 ) 137 | End => 138 clearTimeout(); 139 sink(. End); 140 | Push(x) when ! skip^ => 141 skip := true; 142 clearTimeout(); 143 id := 144 Some( 145 Js.Global.setTimeout( 146 () => { 147 id := None; 148 skip := false; 149 }, 150 f(. x), 151 ), 152 ); 153 sink(. signal); 154 | Push(_) => () 155 } 156 ); 157 }) 158 ); 159 160/* sinks */ 161[@genType] 162let toPromise = (source: sourceT('a)): Js.Promise.t('a) => { 163 Js.Promise.make((~resolve, ~reject as _) => 164 Wonka_operators.takeLast(1, source, (. signal) => 165 switch (signal) { 166 | Start(x) => x(. Pull) 167 | Push(x) => resolve(. x) 168 | End => () 169 } 170 ) 171 ); 172}; 173 174/* sources */ 175[@genType] 176let interval = (p: int): sourceT(int) => 177 curry(sink => { 178 let i = ref(0); 179 let id = 180 Js.Global.setInterval( 181 () => { 182 let num = i^; 183 i := i^ + 1; 184 sink(. Push(num)); 185 }, 186 p, 187 ); 188 189 sink(. 190 Start( 191 (. signal) => 192 switch (signal) { 193 | Close => Js.Global.clearInterval(id) 194 | _ => () 195 }, 196 ), 197 ); 198 }); 199 200[@genType] 201let fromDomEvent = (element: Dom.element, event: string): sourceT(Dom.event) => 202 curry(sink => { 203 let addEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw 204 {| 205 function (element, event, handler) { 206 element.addEventListener(event, handler); 207 } 208 |} 209 ]; 210 211 let removeEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw 212 {| 213 function (element, event, handler) { 214 element.removeEventListener(event, handler); 215 } 216 |} 217 ]; 218 219 let handler = event => sink(. Push(event)); 220 221 sink(. 222 Start( 223 (. signal) => 224 switch (signal) { 225 | Close => removeEventListener(element, event, handler) 226 | _ => () 227 }, 228 ), 229 ); 230 231 addEventListener(element, event, handler); 232 }); 233 234[@genType] 235let fromPromise = (promise: Js.Promise.t('a)): sourceT('a) => 236 curry(sink => { 237 let ended = ref(false); 238 239 ignore( 240 Js.Promise.then_( 241 value => { 242 if (! ended^) { 243 sink(. Push(value)); 244 sink(. End); 245 }; 246 247 Js.Promise.resolve(); 248 }, 249 promise, 250 ), 251 ); 252 253 sink(. 254 Start( 255 (. signal) => 256 switch (signal) { 257 | Close => ended := true 258 | _ => () 259 }, 260 ), 261 ); 262 });