Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.2 9.9 kB view raw
1import * as deriving from './helpers/wonka_deriving'; 2import * as sources from './wonka_sources.gen'; 3import * as types from './wonka_types.gen'; 4import * as web from './web/wonkaJs.gen'; 5 6import callbagFromArray from 'callbag-from-iter'; 7import Observable from 'zen-observable'; 8 9const collectSignals = ( 10 source: types.sourceT<any>, 11 onStart?: (talkbackCb: (tb: types.talkbackT) => void) => void 12) => { 13 let talkback = null; 14 const signals = []; 15 16 source(signal => { 17 signals.push(signal); 18 if (deriving.isStart(signal)) { 19 talkback = deriving.unboxStart(signal); 20 if (onStart) onStart(talkback); 21 talkback(deriving.pull); 22 } else if (deriving.isPush(signal)) { 23 talkback(deriving.pull); 24 } 25 }) 26 27 return signals; 28}; 29 30/* When a Close talkback signal is sent the source should immediately end */ 31const passesActiveClose = (source: types.sourceT<any>) => 32 it('stops emitting when a Close talkback signal is received (spec)', () => { 33 let talkback = null; 34 35 const sink: types.sinkT<any> = signal => { 36 expect(deriving.isPush(signal)).toBeFalsy(); 37 expect(deriving.isEnd(signal)).toBeFalsy(); 38 if (deriving.isStart(signal)) { 39 talkback = deriving.unboxStart(signal); 40 talkback(deriving.close); 41 } 42 }; 43 44 source(sink); 45 expect(talkback).not.toBe(null); 46 }); 47 48/* All synchronous, cold sources won't send anything unless a Pull signal 49 has been received. */ 50const passesColdPull = (source: types.sourceT<any>) => 51 it('sends nothing when no Pull talkback signal has been sent (spec)', () => { 52 let pushes = 0; 53 let talkback = null; 54 55 const sink: types.sinkT<any> = signal => { 56 if (deriving.isPush(signal)) { 57 pushes++; 58 } else if (deriving.isStart(signal)) { 59 talkback = deriving.unboxStart(signal); 60 } 61 }; 62 63 source(sink); 64 expect(talkback).not.toBe(null); 65 expect(pushes).toBe(0); 66 67 setTimeout(() => { 68 expect(pushes).toBe(0); 69 talkback(deriving.pull); 70 }, 10); 71 72 jest.runAllTimers(); 73 expect(pushes).toBe(1); 74 }); 75 76/* All synchronous, cold sources need to use trampoline scheduling to avoid 77 recursively sending more and more Push signals which would eventually lead 78 to a call stack overflow when too many values are emitted. */ 79const passesTrampoline = (source: types.sourceT<any>) => 80 it('uses trampoline scheduling instead of recursive push signals (spec)', () => { 81 let talkback = null; 82 let pushes = 0; 83 84 const signals = []; 85 const sink: types.sinkT<any> = signal => { 86 if (deriving.isPush(signal)) { 87 const lastPushes = ++pushes; 88 signals.push(signal); 89 talkback(deriving.pull); 90 expect(lastPushes).toBe(pushes); 91 } else if (deriving.isStart(signal)) { 92 talkback = deriving.unboxStart(signal); 93 talkback(deriving.pull); 94 expect(pushes).toBe(2); 95 } else if (deriving.isEnd(signal)) { 96 signals.push(signal); 97 expect(pushes).toBe(2); 98 } 99 }; 100 101 source(sink); 102 103 expect(signals).toEqual([ 104 deriving.push(1), 105 deriving.push(2), 106 deriving.end(), 107 ]); 108 }); 109 110beforeEach(() => { 111 jest.useFakeTimers(); 112}); 113 114describe('fromArray', () => { 115 passesTrampoline(sources.fromArray([1, 2])); 116 passesColdPull(sources.fromArray([0])); 117 passesActiveClose(sources.fromArray([0])); 118}); 119 120describe('fromList', () => { 121 passesTrampoline(sources.fromList([1, [2]] as any)); 122 passesColdPull(sources.fromList([0] as any)); 123 passesActiveClose(sources.fromList([0] as any)); 124}); 125 126describe('fromValue', () => { 127 passesColdPull(sources.fromValue(0)); 128 passesActiveClose(sources.fromValue(0)); 129 130 it('sends a single value and ends', () => { 131 expect(collectSignals(sources.fromValue(1))).toEqual([ 132 deriving.start(expect.any(Function)), 133 deriving.push(1), 134 deriving.end() 135 ]); 136 }); 137}); 138 139describe('make', () => { 140 it('may be used to create async sources', () => { 141 const teardown = jest.fn(); 142 const source = sources.make(observer => { 143 setTimeout(() => observer.next(1), 10); 144 setTimeout(() => observer.complete(), 20); 145 return teardown; 146 }); 147 148 const signals = collectSignals(source); 149 expect(signals).toEqual([deriving.start(expect.any(Function))]); 150 jest.runAllTimers(); 151 152 expect(signals).toEqual([ 153 deriving.start(expect.any(Function)), 154 deriving.push(1), 155 deriving.end(), 156 ]); 157 }); 158 159 it('supports active cancellation', () => { 160 const teardown = jest.fn(); 161 const source = sources.make(() => teardown); 162 163 const sink: types.sinkT<any> = signal => { 164 expect(deriving.isPush(signal)).toBeFalsy(); 165 expect(deriving.isEnd(signal)).toBeFalsy(); 166 if (deriving.isStart(signal)) 167 setTimeout(() => deriving.unboxStart(signal)(deriving.close)); 168 }; 169 170 source(sink); 171 expect(teardown).not.toHaveBeenCalled(); 172 jest.runAllTimers(); 173 expect(teardown).toHaveBeenCalled(); 174 }); 175}); 176 177describe('makeSubject', () => { 178 it('may be used to emit signals programmatically', () => { 179 const { source, next, complete } = sources.makeSubject(); 180 const signals = collectSignals(source); 181 182 expect(signals).toEqual([ 183 deriving.start(expect.any(Function)), 184 ]); 185 186 next(1); 187 188 expect(signals).toEqual([ 189 deriving.start(expect.any(Function)), 190 deriving.push(1), 191 ]); 192 193 complete(); 194 195 expect(signals).toEqual([ 196 deriving.start(expect.any(Function)), 197 deriving.push(1), 198 deriving.end(), 199 ]); 200 }); 201 202 it('ignores signals after complete has been called', () => { 203 const { source, next, complete } = sources.makeSubject(); 204 const signals = collectSignals(source); 205 complete(); 206 207 expect(signals).toEqual([ 208 deriving.start(expect.any(Function)), 209 deriving.end(), 210 ]); 211 212 next(1); 213 complete(); 214 expect(signals.length).toBe(2); 215 }); 216}); 217 218describe('never', () => { 219 it('emits nothing and ends immediately', () => { 220 const signals = collectSignals(sources.never); 221 expect(signals).toEqual([deriving.start(expect.any(Function)) ]); 222 }); 223}); 224 225describe('empty', () => { 226 it('emits nothing and ends immediately', () => { 227 const signals = collectSignals(sources.empty); 228 229 expect(signals).toEqual([ 230 deriving.start(expect.any(Function)), 231 deriving.end(), 232 ]); 233 }); 234}); 235 236describe('fromPromise', () => { 237 passesActiveClose(web.fromPromise(Promise.resolve(null))); 238 239 it('emits a value when the promise resolves', async () => { 240 const promise = Promise.resolve(1); 241 const signals = collectSignals(web.fromPromise(promise)); 242 243 expect(signals).toEqual([ 244 deriving.start(expect.any(Function)), 245 ]); 246 247 await promise; 248 249 expect(signals).toEqual([ 250 deriving.start(expect.any(Function)), 251 deriving.push(1), 252 deriving.end(), 253 ]); 254 }); 255}); 256 257describe('fromObservable', () => { 258 beforeEach(() => { 259 jest.useRealTimers(); 260 }); 261 262 it('converts an Observable to a Wonka source', async () => { 263 const source = web.fromObservable(Observable.from([1, 2])); 264 const signals = collectSignals(source); 265 266 await new Promise(resolve => setTimeout(resolve)); 267 268 expect(signals).toEqual([ 269 deriving.start(expect.any(Function)), 270 deriving.push(1), 271 deriving.push(2), 272 deriving.end(), 273 ]); 274 }); 275 276 it('supports cancellation on converted Observables', async () => { 277 const source = web.fromObservable(Observable.from([1, 2])); 278 const signals = collectSignals(source, talkback => { 279 talkback(deriving.close); 280 }); 281 282 await new Promise(resolve => setTimeout(resolve)); 283 284 expect(signals).toEqual([ 285 deriving.start(expect.any(Function)), 286 ]); 287 }); 288}); 289 290describe('fromCallbag', () => { 291 it('converts a Callbag to a Wonka source', () => { 292 const source = web.fromCallbag(callbagFromArray([1, 2])); 293 const signals = collectSignals(source); 294 295 expect(signals).toEqual([ 296 deriving.start(expect.any(Function)), 297 deriving.push(1), 298 deriving.push(2), 299 deriving.end(), 300 ]); 301 }); 302 303 it('supports cancellation on converted Observables', () => { 304 const source = web.fromCallbag(callbagFromArray([1, 2])); 305 const signals = collectSignals(source, talkback => { 306 talkback(deriving.close); 307 }); 308 309 expect(signals).toEqual([ 310 deriving.start(expect.any(Function)), 311 ]); 312 }); 313}); 314 315describe('interval', () => { 316 it('emits Push signals until Cancel is sent', () => { 317 let pushes = 0; 318 let talkback = null; 319 320 const sink: types.sinkT<any> = signal => { 321 if (deriving.isPush(signal)) { 322 pushes++; 323 } else if (deriving.isStart(signal)) { 324 talkback = deriving.unboxStart(signal); 325 } 326 }; 327 328 web.interval(100)(sink); 329 expect(talkback).not.toBe(null); 330 expect(pushes).toBe(0); 331 332 jest.advanceTimersByTime(100); 333 expect(pushes).toBe(1); 334 jest.advanceTimersByTime(100); 335 expect(pushes).toBe(2); 336 337 talkback(deriving.close); 338 jest.advanceTimersByTime(100); 339 expect(pushes).toBe(2); 340 }); 341}); 342 343describe('fromDomEvent', () => { 344 it('emits Push signals for events on a DOM element', () => { 345 let talkback = null; 346 347 const element = { 348 addEventListener: jest.fn(), 349 removeEventListener: jest.fn(), 350 }; 351 352 const sink: types.sinkT<any> = signal => { 353 expect(deriving.isEnd(signal)).toBeFalsy(); 354 if (deriving.isStart(signal)) 355 talkback = deriving.unboxStart(signal); 356 }; 357 358 web.fromDomEvent(element as any, 'click')(sink); 359 360 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function)); 361 expect(element.removeEventListener).not.toHaveBeenCalled(); 362 const listener = element.addEventListener.mock.calls[0][1]; 363 364 listener(1); 365 listener(2); 366 talkback(deriving.close); 367 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener); 368 }); 369});