Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.5 12 kB view raw
1import * as deriving from './helpers/Wonka_deriving'; 2import * as sources from './Wonka_sources.gen'; 3import * as operators from './Wonka_operators.gen'; 4import * as types from './Wonka_types.gen'; 5import * as web from './web/WonkaJs.gen'; 6 7import callbagFromArray from 'callbag-from-iter'; 8import Observable from 'zen-observable'; 9 10const collectSignals = ( 11 source: types.sourceT<any>, 12 onStart?: (talkbackCb: (tb: types.talkbackT) => void) => void 13) => { 14 let talkback = null; 15 const signals = []; 16 17 source(signal => { 18 signals.push(signal); 19 if (deriving.isStart(signal)) { 20 talkback = deriving.unboxStart(signal); 21 if (onStart) onStart(talkback); 22 talkback(deriving.pull); 23 } else if (deriving.isPush(signal)) { 24 talkback(deriving.pull); 25 } 26 }) 27 28 return signals; 29}; 30 31/* When a Close talkback signal is sent the source should immediately end */ 32const passesActiveClose = (source: types.sourceT<any>) => 33 it('stops emitting when a Close talkback signal is received (spec)', () => { 34 let talkback = null; 35 36 const sink: types.sinkT<any> = signal => { 37 expect(deriving.isPush(signal)).toBeFalsy(); 38 expect(deriving.isEnd(signal)).toBeFalsy(); 39 if (deriving.isStart(signal)) { 40 talkback = deriving.unboxStart(signal); 41 talkback(deriving.close); 42 } 43 }; 44 45 source(sink); 46 expect(talkback).not.toBe(null); 47 }); 48 49/* All synchronous, cold sources won't send anything unless a Pull signal 50 has been received. */ 51const passesColdPull = (source: types.sourceT<any>) => 52 it('sends nothing when no Pull talkback signal has been sent (spec)', () => { 53 let pushes = 0; 54 let talkback = null; 55 56 const sink: types.sinkT<any> = signal => { 57 if (deriving.isPush(signal)) { 58 pushes++; 59 } else if (deriving.isStart(signal)) { 60 talkback = deriving.unboxStart(signal); 61 } 62 }; 63 64 source(sink); 65 expect(talkback).not.toBe(null); 66 expect(pushes).toBe(0); 67 68 setTimeout(() => { 69 expect(pushes).toBe(0); 70 talkback(deriving.pull); 71 }, 10); 72 73 jest.runAllTimers(); 74 expect(pushes).toBe(1); 75 }); 76 77/* All synchronous, cold sources need to use trampoline scheduling to avoid 78 recursively sending more and more Push signals which would eventually lead 79 to a call stack overflow when too many values are emitted. */ 80const passesTrampoline = (source: types.sourceT<any>) => 81 it('uses trampoline scheduling instead of recursive push signals (spec)', () => { 82 let talkback = null; 83 let pushes = 0; 84 85 const signals = []; 86 const sink: types.sinkT<any> = signal => { 87 if (deriving.isPush(signal)) { 88 const lastPushes = ++pushes; 89 signals.push(signal); 90 talkback(deriving.pull); 91 expect(lastPushes).toBe(pushes); 92 } else if (deriving.isStart(signal)) { 93 talkback = deriving.unboxStart(signal); 94 talkback(deriving.pull); 95 expect(pushes).toBe(2); 96 } else if (deriving.isEnd(signal)) { 97 signals.push(signal); 98 expect(pushes).toBe(2); 99 } 100 }; 101 102 source(sink); 103 104 expect(signals).toEqual([ 105 deriving.push(1), 106 deriving.push(2), 107 deriving.end(), 108 ]); 109 }); 110 111beforeEach(() => { 112 jest.useFakeTimers(); 113}); 114 115describe('fromArray', () => { 116 passesTrampoline(sources.fromArray([1, 2])); 117 passesColdPull(sources.fromArray([0])); 118 passesActiveClose(sources.fromArray([0])); 119}); 120 121describe('fromList', () => { 122 passesTrampoline(sources.fromList([1, [2]] as any)); 123 passesColdPull(sources.fromList([0] as any)); 124 passesActiveClose(sources.fromList([0] as any)); 125}); 126 127describe('fromValue', () => { 128 passesColdPull(sources.fromValue(0)); 129 passesActiveClose(sources.fromValue(0)); 130 131 it('sends a single value and ends', () => { 132 expect(collectSignals(sources.fromValue(1))).toEqual([ 133 deriving.start(expect.any(Function)), 134 deriving.push(1), 135 deriving.end() 136 ]); 137 }); 138}); 139 140describe('merge', () => { 141 const source = operators.merge<any>([ 142 sources.fromValue(0), 143 sources.empty 144 ]); 145 146 passesColdPull(source); 147 passesActiveClose(source); 148 149 it('correctly merges two sources where the second is empty', () => { 150 const source = operators.merge<any>([ 151 sources.fromValue(0), 152 sources.empty 153 ]); 154 155 expect(collectSignals(source)).toEqual([ 156 deriving.start(expect.any(Function)), 157 deriving.push(0), 158 deriving.end(), 159 ]); 160 }); 161 162 it('correctly merges hot sources', () => { 163 const onStart = jest.fn(); 164 const source = operators.merge<any>([ 165 operators.onStart(onStart)(sources.never), 166 operators.onStart(onStart)(sources.fromArray([1, 2])), 167 ]); 168 169 const signals = collectSignals(source); 170 expect(onStart).toHaveBeenCalledTimes(2); 171 172 expect(signals).toEqual([ 173 deriving.start(expect.any(Function)), 174 deriving.push(1), 175 deriving.push(2), 176 ]); 177 }); 178 179 it('correctly merges asynchronous sources', () => { 180 jest.useFakeTimers(); 181 182 const onStart = jest.fn(); 183 const source = operators.merge<any>([ 184 operators.onStart(onStart)(sources.fromValue(-1)), 185 operators.onStart(onStart)( 186 operators.take(2)(web.interval(50)) 187 ), 188 ]); 189 190 const signals = collectSignals(source); 191 jest.advanceTimersByTime(100); 192 expect(onStart).toHaveBeenCalledTimes(2); 193 194 expect(signals).toEqual([ 195 deriving.start(expect.any(Function)), 196 deriving.push(-1), 197 deriving.push(0), 198 deriving.push(1), 199 deriving.end(), 200 ]); 201 }); 202}); 203 204describe('concat', () => { 205 const source = operators.concat<any>([ 206 sources.fromValue(0), 207 sources.empty 208 ]); 209 210 passesColdPull(source); 211 passesActiveClose(source); 212 213 it('correctly concats two sources where the second is empty', () => { 214 const source = operators.concat<any>([ 215 sources.fromValue(0), 216 sources.empty 217 ]); 218 219 expect(collectSignals(source)).toEqual([ 220 deriving.start(expect.any(Function)), 221 deriving.push(0), 222 deriving.end(), 223 ]); 224 }); 225}); 226 227describe('make', () => { 228 it('may be used to create async sources', () => { 229 const teardown = jest.fn(); 230 const source = sources.make(observer => { 231 setTimeout(() => observer.next(1), 10); 232 setTimeout(() => observer.complete(), 20); 233 return teardown; 234 }); 235 236 const signals = collectSignals(source); 237 expect(signals).toEqual([deriving.start(expect.any(Function))]); 238 jest.runAllTimers(); 239 240 expect(signals).toEqual([ 241 deriving.start(expect.any(Function)), 242 deriving.push(1), 243 deriving.end(), 244 ]); 245 }); 246 247 it('supports active cancellation', () => { 248 const teardown = jest.fn(); 249 const source = sources.make(() => teardown); 250 251 const sink: types.sinkT<any> = signal => { 252 expect(deriving.isPush(signal)).toBeFalsy(); 253 expect(deriving.isEnd(signal)).toBeFalsy(); 254 if (deriving.isStart(signal)) 255 setTimeout(() => deriving.unboxStart(signal)(deriving.close)); 256 }; 257 258 source(sink); 259 expect(teardown).not.toHaveBeenCalled(); 260 jest.runAllTimers(); 261 expect(teardown).toHaveBeenCalled(); 262 }); 263}); 264 265describe('makeSubject', () => { 266 it('may be used to emit signals programmatically', () => { 267 const { source, next, complete } = sources.makeSubject(); 268 const signals = collectSignals(source); 269 270 expect(signals).toEqual([ 271 deriving.start(expect.any(Function)), 272 ]); 273 274 next(1); 275 276 expect(signals).toEqual([ 277 deriving.start(expect.any(Function)), 278 deriving.push(1), 279 ]); 280 281 complete(); 282 283 expect(signals).toEqual([ 284 deriving.start(expect.any(Function)), 285 deriving.push(1), 286 deriving.end(), 287 ]); 288 }); 289 290 it('ignores signals after complete has been called', () => { 291 const { source, next, complete } = sources.makeSubject(); 292 const signals = collectSignals(source); 293 complete(); 294 295 expect(signals).toEqual([ 296 deriving.start(expect.any(Function)), 297 deriving.end(), 298 ]); 299 300 next(1); 301 complete(); 302 expect(signals.length).toBe(2); 303 }); 304}); 305 306describe('never', () => { 307 it('emits nothing and ends immediately', () => { 308 const signals = collectSignals(sources.never); 309 expect(signals).toEqual([deriving.start(expect.any(Function)) ]); 310 }); 311}); 312 313describe('empty', () => { 314 it('emits nothing and ends immediately', () => { 315 const signals = collectSignals(sources.empty); 316 317 expect(signals).toEqual([ 318 deriving.start(expect.any(Function)), 319 deriving.end(), 320 ]); 321 }); 322}); 323 324describe('fromPromise', () => { 325 passesActiveClose(web.fromPromise(Promise.resolve(null))); 326 327 it('emits a value when the promise resolves', async () => { 328 const promise = Promise.resolve(1); 329 const signals = collectSignals(web.fromPromise(promise)); 330 331 expect(signals).toEqual([ 332 deriving.start(expect.any(Function)), 333 ]); 334 335 await promise; 336 337 expect(signals).toEqual([ 338 deriving.start(expect.any(Function)), 339 deriving.push(1), 340 deriving.end(), 341 ]); 342 }); 343}); 344 345describe('fromObservable', () => { 346 beforeEach(() => { 347 jest.useRealTimers(); 348 }); 349 350 it('converts an Observable to a Wonka source', async () => { 351 const source = web.fromObservable(Observable.from([1, 2])); 352 const signals = collectSignals(source); 353 354 await new Promise(resolve => setTimeout(resolve)); 355 356 expect(signals).toEqual([ 357 deriving.start(expect.any(Function)), 358 deriving.push(1), 359 deriving.push(2), 360 deriving.end(), 361 ]); 362 }); 363 364 it('supports cancellation on converted Observables', async () => { 365 const source = web.fromObservable(Observable.from([1, 2])); 366 const signals = collectSignals(source, talkback => { 367 talkback(deriving.close); 368 }); 369 370 await new Promise(resolve => setTimeout(resolve)); 371 372 expect(signals).toEqual([ 373 deriving.start(expect.any(Function)), 374 ]); 375 }); 376}); 377 378describe('fromCallbag', () => { 379 it('converts a Callbag to a Wonka source', () => { 380 const source = web.fromCallbag(callbagFromArray([1, 2])); 381 const signals = collectSignals(source); 382 383 expect(signals).toEqual([ 384 deriving.start(expect.any(Function)), 385 deriving.push(1), 386 deriving.push(2), 387 deriving.end(), 388 ]); 389 }); 390 391 it('supports cancellation on converted Observables', () => { 392 const source = web.fromCallbag(callbagFromArray([1, 2])); 393 const signals = collectSignals(source, talkback => { 394 talkback(deriving.close); 395 }); 396 397 expect(signals).toEqual([ 398 deriving.start(expect.any(Function)), 399 ]); 400 }); 401}); 402 403describe('interval', () => { 404 it('emits Push signals until Cancel is sent', () => { 405 let pushes = 0; 406 let talkback = null; 407 408 const sink: types.sinkT<any> = signal => { 409 if (deriving.isPush(signal)) { 410 pushes++; 411 } else if (deriving.isStart(signal)) { 412 talkback = deriving.unboxStart(signal); 413 } 414 }; 415 416 web.interval(100)(sink); 417 expect(talkback).not.toBe(null); 418 expect(pushes).toBe(0); 419 420 jest.advanceTimersByTime(100); 421 expect(pushes).toBe(1); 422 jest.advanceTimersByTime(100); 423 expect(pushes).toBe(2); 424 425 talkback(deriving.close); 426 jest.advanceTimersByTime(100); 427 expect(pushes).toBe(2); 428 }); 429}); 430 431describe('fromDomEvent', () => { 432 it('emits Push signals for events on a DOM element', () => { 433 let talkback = null; 434 435 const element = { 436 addEventListener: jest.fn(), 437 removeEventListener: jest.fn(), 438 }; 439 440 const sink: types.sinkT<any> = signal => { 441 expect(deriving.isEnd(signal)).toBeFalsy(); 442 if (deriving.isStart(signal)) 443 talkback = deriving.unboxStart(signal); 444 }; 445 446 web.fromDomEvent(element as any, 'click')(sink); 447 448 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function)); 449 expect(element.removeEventListener).not.toHaveBeenCalled(); 450 const listener = element.addEventListener.mock.calls[0][1]; 451 452 listener(1); 453 listener(2); 454 talkback(deriving.close); 455 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener); 456 }); 457});