Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.0 6.1 kB view raw
1open Wonka_types; 2 3[@genType] 4let fromObservable = Wonka_observable.fromObservable; 5[@genType] 6let toObservable = Wonka_observable.toObservable; 7 8[@genType] 9let fromCallbag = Wonka_callbag.fromCallbag; 10[@genType] 11let toCallbag = Wonka_callbag.toCallbag; 12 13/* operators */ 14 15[@genType] 16let debounce = (f: (. 'a) => int): operatorT('a, 'a) => 17 curry(source => 18 curry(sink => { 19 let gotEndSignal = ref(false); 20 let id: ref(option(Js.Global.timeoutId)) = ref(None); 21 22 let clearTimeout = () => 23 switch (id^) { 24 | Some(timeoutId) => 25 id := None; 26 Js.Global.clearTimeout(timeoutId); 27 | None => () 28 }; 29 30 source((. signal) => 31 switch (signal) { 32 | Start(tb) => 33 sink(. 34 Start( 35 (. signal) => 36 switch (signal) { 37 | Close => 38 clearTimeout(); 39 tb(. Close); 40 | _ => tb(. signal) 41 }, 42 ), 43 ) 44 | Push(x) => 45 clearTimeout(); 46 id := 47 Some( 48 Js.Global.setTimeout( 49 () => { 50 id := None; 51 sink(. signal); 52 if (gotEndSignal^) { 53 sink(. End); 54 }; 55 }, 56 f(. x), 57 ), 58 ); 59 | End => 60 gotEndSignal := true; 61 62 switch (id^) { 63 | None => sink(. End) 64 | _ => () 65 }; 66 } 67 ); 68 }) 69 ); 70 71type delayStateT = { 72 mutable talkback: (. talkbackT) => unit, 73 mutable active: int, 74 mutable gotEndSignal: bool, 75}; 76 77[@genType] 78let delay = (wait: int): operatorT('a, 'a) => 79 curry(source => 80 curry(sink => { 81 let state: delayStateT = { 82 talkback: Wonka_helpers.talkbackPlaceholder, 83 active: 0, 84 gotEndSignal: false, 85 }; 86 87 source((. signal) => 88 switch (signal) { 89 | Start(tb) => state.talkback = tb 90 | _ when !state.gotEndSignal => 91 state.active = state.active + 1; 92 ignore( 93 Js.Global.setTimeout( 94 () => { 95 if (state.gotEndSignal && state.active === 0) { 96 sink(. End); 97 } else { 98 state.active = state.active - 1; 99 }; 100 101 sink(. signal); 102 }, 103 wait, 104 ), 105 ); 106 | _ => () 107 } 108 ); 109 110 sink(. 111 Start( 112 (. signal) => 113 switch (signal) { 114 | Close => 115 state.gotEndSignal = true; 116 if (state.active === 0) { 117 sink(. End); 118 }; 119 | _ when !state.gotEndSignal => state.talkback(. signal) 120 | _ => () 121 }, 122 ), 123 ); 124 }) 125 ); 126 127[@genType] 128let throttle = (f: (. 'a) => int): operatorT('a, 'a) => 129 curry(source => 130 curry(sink => { 131 let skip = ref(false); 132 let id: ref(option(Js.Global.timeoutId)) = ref(None); 133 let clearTimeout = () => 134 switch (id^) { 135 | Some(timeoutId) => Js.Global.clearTimeout(timeoutId) 136 | None => () 137 }; 138 139 source((. signal) => 140 switch (signal) { 141 | Start(tb) => 142 sink(. 143 Start( 144 (. signal) => 145 switch (signal) { 146 | Close => 147 clearTimeout(); 148 tb(. Close); 149 | _ => tb(. signal) 150 }, 151 ), 152 ) 153 | End => 154 clearTimeout(); 155 sink(. End); 156 | Push(x) when ! skip^ => 157 skip := true; 158 clearTimeout(); 159 id := 160 Some( 161 Js.Global.setTimeout( 162 () => { 163 id := None; 164 skip := false; 165 }, 166 f(. x), 167 ), 168 ); 169 sink(. signal); 170 | Push(_) => () 171 } 172 ); 173 }) 174 ); 175 176/* sinks */ 177[@genType] 178let toPromise = (source: sourceT('a)): Js.Promise.t('a) => { 179 Js.Promise.make((~resolve, ~reject as _) => 180 Wonka_operators.takeLast(1, source, (. signal) => 181 switch (signal) { 182 | Start(x) => x(. Pull) 183 | Push(x) => resolve(. x) 184 | End => () 185 } 186 ) 187 ); 188}; 189 190/* sources */ 191[@genType] 192let interval = (p: int): sourceT(int) => 193 curry(sink => { 194 let i = ref(0); 195 let id = 196 Js.Global.setInterval( 197 () => { 198 let num = i^; 199 i := i^ + 1; 200 sink(. Push(num)); 201 }, 202 p, 203 ); 204 205 sink(. 206 Start( 207 (. signal) => 208 switch (signal) { 209 | Close => Js.Global.clearInterval(id) 210 | _ => () 211 }, 212 ), 213 ); 214 }); 215 216[@genType] 217let fromDomEvent = (element: Dom.element, event: string): sourceT(Dom.event) => 218 curry(sink => { 219 let addEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw 220 {| 221 function (element, event, handler) { 222 element.addEventListener(event, handler); 223 } 224 |} 225 ]; 226 227 let removeEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw 228 {| 229 function (element, event, handler) { 230 element.removeEventListener(event, handler); 231 } 232 |} 233 ]; 234 235 let handler = event => sink(. Push(event)); 236 237 sink(. 238 Start( 239 (. signal) => 240 switch (signal) { 241 | Close => removeEventListener(element, event, handler) 242 | _ => () 243 }, 244 ), 245 ); 246 247 addEventListener(element, event, handler); 248 }); 249 250[@genType] 251let fromPromise = (promise: Js.Promise.t('a)): sourceT('a) => 252 curry(sink => { 253 let ended = ref(false); 254 255 ignore( 256 Js.Promise.then_( 257 value => { 258 if (! ended^) { 259 sink(. Push(value)); 260 sink(. End); 261 }; 262 263 Js.Promise.resolve(); 264 }, 265 promise, 266 ), 267 ); 268 269 sink(. 270 Start( 271 (. signal) => 272 switch (signal) { 273 | Close => ended := true 274 | _ => () 275 }, 276 ), 277 ); 278 });