Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v5.0.0-rc.1 5.8 kB view raw
1open Wonka_types; 2 3[@genType] 4let fromObservable = Wonka_observable.fromObservable; 5[@genType] 6let toObservable = Wonka_observable.toObservable; 7 8[@genType] 9let fromCallbag = Wonka_callbag.fromCallbag; 10[@genType] 11let toCallbag = Wonka_callbag.toCallbag; 12 13/* operators */ 14 15type debounceStateT = { 16 mutable id: option(Js.Global.timeoutId), 17 mutable deferredEnded: bool, 18 mutable ended: bool, 19}; 20 21[@genType] 22let debounce = (f: (. 'a) => int): operatorT('a, 'a) => 23 curry(source => 24 curry(sink => { 25 let state: debounceStateT = { 26 id: None, 27 deferredEnded: false, 28 ended: false, 29 }; 30 31 let clearTimeout = () => 32 switch (state.id) { 33 | Some(timeoutId) => 34 state.id = None; 35 Js.Global.clearTimeout(timeoutId); 36 | None => () 37 }; 38 39 source((. signal) => 40 switch (signal) { 41 | Start(tb) => 42 sink(. 43 Start( 44 (. signal) => 45 if (!state.ended) { 46 switch (signal) { 47 | Close => 48 state.ended = true; 49 state.deferredEnded = false; 50 clearTimeout(); 51 tb(. Close); 52 | Pull => tb(. Pull) 53 }; 54 }, 55 ), 56 ) 57 | Push(x) when !state.ended => 58 clearTimeout(); 59 state.id = 60 Some( 61 Js.Global.setTimeout( 62 () => { 63 state.id = None; 64 sink(. signal); 65 if (state.deferredEnded) { 66 sink(. End); 67 }; 68 }, 69 f(. x), 70 ), 71 ); 72 | Push(_) => () 73 | End when !state.ended => 74 state.ended = true; 75 switch (state.id) { 76 | Some(_) => state.deferredEnded = true 77 | None => sink(. End) 78 }; 79 | End => () 80 } 81 ); 82 }) 83 ); 84 85[@genType] 86let delay = (wait: int): operatorT('a, 'a) => 87 curry(source => 88 curry(sink => { 89 let active = ref(0); 90 91 source((. signal) => 92 switch (signal) { 93 | Start(_) => sink(. signal) 94 | _ => 95 active := active^ + 1; 96 ignore( 97 Js.Global.setTimeout( 98 () => 99 if (active^ !== 0) { 100 active := active^ - 1; 101 sink(. signal); 102 }, 103 wait, 104 ), 105 ); 106 } 107 ); 108 }) 109 ); 110 111[@genType] 112let throttle = (f: (. 'a) => int): operatorT('a, 'a) => 113 curry(source => 114 curry(sink => { 115 let skip = ref(false); 116 let id: ref(option(Js.Global.timeoutId)) = ref(None); 117 let clearTimeout = () => 118 switch (id^) { 119 | Some(timeoutId) => Js.Global.clearTimeout(timeoutId) 120 | None => () 121 }; 122 123 source((. signal) => 124 switch (signal) { 125 | Start(tb) => 126 sink(. 127 Start( 128 (. signal) => 129 switch (signal) { 130 | Close => 131 clearTimeout(); 132 tb(. Close); 133 | _ => tb(. signal) 134 }, 135 ), 136 ) 137 | End => 138 clearTimeout(); 139 sink(. End); 140 | Push(x) when ! skip^ => 141 skip := true; 142 clearTimeout(); 143 id := 144 Some( 145 Js.Global.setTimeout( 146 () => { 147 id := None; 148 skip := false; 149 }, 150 f(. x), 151 ), 152 ); 153 sink(. signal); 154 | Push(_) => () 155 } 156 ); 157 }) 158 ); 159 160/* sinks */ 161[@genType] 162let toPromise = (source: sourceT('a)): Js.Promise.t('a) => { 163 Js.Promise.make((~resolve, ~reject as _) => { 164 Wonka_operators.takeLast(1, source, (. signal) => 165 switch (signal) { 166 | Start(x) => x(. Pull) 167 | Push(x) => resolve(. x) 168 | End => () 169 } 170 ); 171 (); 172 }); 173}; 174 175/* sources */ 176[@genType] 177let interval = (p: int): sourceT(int) => 178 curry(sink => { 179 let i = ref(0); 180 let id = 181 Js.Global.setInterval( 182 () => { 183 let num = i^; 184 i := i^ + 1; 185 sink(. Push(num)); 186 }, 187 p, 188 ); 189 190 sink(. 191 Start( 192 (. signal) => 193 switch (signal) { 194 | Close => Js.Global.clearInterval(id) 195 | _ => () 196 }, 197 ), 198 ); 199 }); 200 201[@genType] 202let fromDomEvent = (element: Dom.element, event: string): sourceT(Dom.event) => 203 curry(sink => { 204 let addEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw 205 {| 206 function (element, event, handler) { 207 element.addEventListener(event, handler); 208 } 209 |} 210 ]; 211 212 let removeEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw 213 {| 214 function (element, event, handler) { 215 element.removeEventListener(event, handler); 216 } 217 |} 218 ]; 219 220 let handler = event => sink(. Push(event)); 221 222 sink(. 223 Start( 224 (. signal) => 225 switch (signal) { 226 | Close => removeEventListener(element, event, handler) 227 | _ => () 228 }, 229 ), 230 ); 231 232 addEventListener(element, event, handler); 233 }); 234 235[@genType] 236let fromPromise = (promise: Js.Promise.t('a)): sourceT('a) => 237 curry(sink => { 238 let ended = ref(false); 239 240 ignore( 241 Js.Promise.then_( 242 value => { 243 if (! ended^) { 244 sink(. Push(value)); 245 sink(. End); 246 }; 247 248 Js.Promise.resolve(); 249 }, 250 promise, 251 ), 252 ); 253 254 sink(. 255 Start( 256 (. signal) => 257 switch (signal) { 258 | Close => ended := true 259 | _ => () 260 }, 261 ), 262 ); 263 });