Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.0.0 11 kB view raw
1import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 2import { push, start, talkbackPlaceholder } from '../helpers'; 3 4import * as sources from '../sources'; 5import * as operators from '../operators'; 6import * as callbag from '../callbag'; 7import * as observable from '../observable'; 8 9import callbagFromArray from 'callbag-from-iter'; 10import Observable from 'zen-observable'; 11 12const collectSignals = (source: Source<any>, onStart?: (talkbackCb: TalkbackFn) => void) => { 13 let talkback = talkbackPlaceholder; 14 const signals: Signal<any>[] = []; 15 source(signal => { 16 signals.push(signal); 17 if (signal === SignalKind.End) { 18 /*noop*/ 19 } else if (signal.tag === SignalKind.Start) { 20 talkback = signal[0]; 21 if (onStart) onStart(talkback); 22 talkback(TalkbackKind.Pull); 23 } else { 24 talkback(TalkbackKind.Pull); 25 } 26 }); 27 28 return signals; 29}; 30 31/* When a Close talkback signal is sent the source should immediately end */ 32const passesActiveClose = (source: Source<any>) => { 33 it('stops emitting when a Close talkback signal is received (spec)', () => { 34 let talkback: TalkbackFn | null = null; 35 const sink: Sink<any> = signal => { 36 expect(signal).not.toBe(SignalKind.End); 37 expect((signal as any).tag).not.toBe(SignalKind.Push); 38 if ((signal as any).tag === SignalKind.Start) { 39 (talkback = signal[0])(TalkbackKind.Close); 40 } 41 }; 42 source(sink); 43 expect(talkback).not.toBe(null); 44 }); 45}; 46 47/* All synchronous, cold sources won't send anything unless a Pull signal 48 has been received. */ 49const passesColdPull = (source: Source<any>) => { 50 it('sends nothing when no Pull talkback signal has been sent (spec)', () => { 51 let talkback: TalkbackFn | null = null; 52 let pushes = 0; 53 54 const sink: Sink<any> = signal => { 55 if (signal === SignalKind.End) { 56 /*noop*/ 57 } else if (signal.tag === SignalKind.Push) { 58 pushes++; 59 } else { 60 talkback = signal[0]; 61 } 62 }; 63 64 source(sink); 65 expect(talkback).not.toBe(null); 66 expect(pushes).toBe(0); 67 68 setTimeout(() => { 69 expect(pushes).toBe(0); 70 talkback!(TalkbackKind.Pull); 71 }, 10); 72 73 jest.runAllTimers(); 74 expect(pushes).toBe(1); 75 }); 76}; 77 78/* All synchronous, cold sources need to use trampoline scheduling to avoid 79 recursively sending more and more Push signals which would eventually lead 80 to a call stack overflow when too many values are emitted. */ 81const passesTrampoline = (source: Source<any>) => { 82 it('uses trampoline scheduling instead of recursive push signals (spec)', () => { 83 let talkback: TalkbackFn | null = null; 84 let pushes = 0; 85 86 const signals: Signal<any>[] = []; 87 const sink: Sink<any> = signal => { 88 if (signal === SignalKind.End) { 89 signals.push(signal); 90 expect(pushes).toBe(2); 91 } else if (signal.tag === SignalKind.Push) { 92 const lastPushes = ++pushes; 93 signals.push(signal); 94 talkback!(TalkbackKind.Pull); 95 expect(lastPushes).toBe(pushes); 96 } else if (signal.tag === SignalKind.Start) { 97 (talkback = signal[0])(TalkbackKind.Pull); 98 expect(pushes).toBe(2); 99 } 100 }; 101 102 source(sink); 103 expect(signals).toEqual([push(1), push(2), SignalKind.End]); 104 }); 105}; 106 107beforeEach(() => { 108 jest.useFakeTimers(); 109}); 110 111describe('fromArray', () => { 112 passesTrampoline(sources.fromArray([1, 2])); 113 passesColdPull(sources.fromArray([0])); 114 passesActiveClose(sources.fromArray([0])); 115}); 116 117describe('fromValue', () => { 118 passesColdPull(sources.fromValue(0)); 119 passesActiveClose(sources.fromValue(0)); 120 121 it('sends a single value and ends', () => { 122 expect(collectSignals(sources.fromValue(1))).toEqual([ 123 start(expect.any(Function)), 124 push(1), 125 SignalKind.End, 126 ]); 127 }); 128}); 129 130describe('merge', () => { 131 const source = operators.merge<any>([sources.fromValue(0), sources.empty]); 132 133 passesColdPull(source); 134 passesActiveClose(source); 135 136 it('correctly merges two sources where the second is empty', () => { 137 const source = operators.merge<any>([sources.fromValue(0), sources.empty]); 138 139 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]); 140 }); 141 142 it('correctly merges hot sources', () => { 143 const onStart = jest.fn(); 144 const source = operators.merge<any>([ 145 operators.onStart(onStart)(sources.never), 146 operators.onStart(onStart)(sources.fromArray([1, 2])), 147 ]); 148 149 const signals = collectSignals(source); 150 expect(onStart).toHaveBeenCalledTimes(2); 151 152 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2)]); 153 }); 154 155 it('correctly merges asynchronous sources', () => { 156 jest.useFakeTimers(); 157 158 const onStart = jest.fn(); 159 const source = operators.merge<any>([ 160 operators.onStart(onStart)(sources.fromValue(-1)), 161 operators.onStart(onStart)(operators.take(2)(sources.interval(50))), 162 ]); 163 164 const signals = collectSignals(source); 165 jest.advanceTimersByTime(100); 166 expect(onStart).toHaveBeenCalledTimes(2); 167 168 expect(signals).toEqual([ 169 start(expect.any(Function)), 170 push(-1), 171 push(0), 172 push(1), 173 SignalKind.End, 174 ]); 175 }); 176}); 177 178describe('concat', () => { 179 const source = operators.concat<any>([sources.fromValue(0), sources.empty]); 180 181 passesColdPull(source); 182 passesActiveClose(source); 183 184 it('correctly concats two sources where the second is empty', () => { 185 const source = operators.concat<any>([sources.fromValue(0), sources.empty]); 186 187 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]); 188 }); 189}); 190 191describe('make', () => { 192 it('may be used to create async sources', () => { 193 const teardown = jest.fn(); 194 const source = sources.make(observer => { 195 setTimeout(() => observer.next(1), 10); 196 setTimeout(() => observer.complete(), 20); 197 return teardown; 198 }); 199 200 const signals = collectSignals(source); 201 expect(signals).toEqual([start(expect.any(Function))]); 202 jest.runAllTimers(); 203 204 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]); 205 }); 206 207 it('supports active cancellation', () => { 208 const teardown = jest.fn(); 209 const source = sources.make(() => teardown); 210 211 const sink: Sink<any> = signal => { 212 expect(signal).not.toBe(SignalKind.End); 213 expect((signal as any).tag).not.toBe(SignalKind.Push); 214 setTimeout(() => signal[0](TalkbackKind.Close)); 215 }; 216 217 source(sink); 218 expect(teardown).not.toHaveBeenCalled(); 219 jest.runAllTimers(); 220 expect(teardown).toHaveBeenCalled(); 221 }); 222}); 223 224describe('makeSubject', () => { 225 it('may be used to emit signals programmatically', () => { 226 const { source, next, complete } = sources.makeSubject(); 227 const signals = collectSignals(source); 228 229 expect(signals).toEqual([start(expect.any(Function))]); 230 231 next(1); 232 233 expect(signals).toEqual([start(expect.any(Function)), push(1)]); 234 235 complete(); 236 237 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]); 238 }); 239 240 it('ignores signals after complete has been called', () => { 241 const { source, next, complete } = sources.makeSubject(); 242 const signals = collectSignals(source); 243 complete(); 244 245 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]); 246 247 next(1); 248 complete(); 249 expect(signals.length).toBe(2); 250 }); 251}); 252 253describe('never', () => { 254 it('emits nothing and ends immediately', () => { 255 const signals = collectSignals(sources.never); 256 expect(signals).toEqual([start(expect.any(Function))]); 257 }); 258}); 259 260describe('empty', () => { 261 it('emits nothing and ends immediately', () => { 262 const signals = collectSignals(sources.empty); 263 264 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]); 265 }); 266}); 267 268describe('fromPromise', () => { 269 passesActiveClose(sources.fromPromise(Promise.resolve(null))); 270 271 it('emits a value when the promise resolves', async () => { 272 const promise = Promise.resolve(1); 273 const signals = collectSignals(sources.fromPromise(promise)); 274 275 expect(signals).toEqual([start(expect.any(Function))]); 276 277 await promise; 278 279 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]); 280 }); 281}); 282 283describe('fromObservable', () => { 284 beforeEach(() => { 285 jest.useRealTimers(); 286 }); 287 288 it('converts an Observable to a Wonka source', async () => { 289 const source = observable.fromObservable(Observable.from([1, 2])); 290 const signals = collectSignals(source); 291 292 await new Promise(resolve => setTimeout(resolve)); 293 294 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]); 295 }); 296 297 it('supports cancellation on converted Observables', async () => { 298 const source = observable.fromObservable(Observable.from([1, 2])); 299 const signals = collectSignals(source, talkback => { 300 talkback(TalkbackKind.Close); 301 }); 302 303 await new Promise(resolve => setTimeout(resolve)); 304 305 expect(signals).toEqual([start(expect.any(Function))]); 306 }); 307}); 308 309describe('fromCallbag', () => { 310 it('converts a Callbag to a Wonka source', () => { 311 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any); 312 const signals = collectSignals(source); 313 314 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]); 315 }); 316 317 it('supports cancellation on converted Observables', () => { 318 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any); 319 const signals = collectSignals(source, talkback => { 320 talkback(TalkbackKind.Close); 321 }); 322 323 expect(signals).toEqual([start(expect.any(Function))]); 324 }); 325}); 326 327describe('interval', () => { 328 it('emits Push signals until Cancel is sent', () => { 329 let pushes = 0; 330 let talkback: TalkbackFn | null = null; 331 332 const sink: Sink<any> = signal => { 333 if (signal === SignalKind.End) { 334 /*noop*/ 335 } else if (signal.tag === SignalKind.Push) { 336 pushes++; 337 } else { 338 talkback = signal[0]; 339 } 340 }; 341 342 sources.interval(100)(sink); 343 expect(talkback).not.toBe(null); 344 expect(pushes).toBe(0); 345 346 jest.advanceTimersByTime(100); 347 expect(pushes).toBe(1); 348 jest.advanceTimersByTime(100); 349 expect(pushes).toBe(2); 350 351 talkback!(TalkbackKind.Close); 352 jest.advanceTimersByTime(100); 353 expect(pushes).toBe(2); 354 }); 355}); 356 357describe('fromDomEvent', () => { 358 it('emits Push signals for events on a DOM element', () => { 359 let talkback: TalkbackFn | null = null; 360 361 const element = { 362 addEventListener: jest.fn(), 363 removeEventListener: jest.fn(), 364 }; 365 366 const sink: Sink<any> = signal => { 367 expect(signal).not.toBe(SignalKind.End); 368 if ((signal as any).tag === SignalKind.Start) talkback = signal[0]; 369 }; 370 371 sources.fromDomEvent(element as any, 'click')(sink); 372 373 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function)); 374 expect(element.removeEventListener).not.toHaveBeenCalled(); 375 const listener = element.addEventListener.mock.calls[0][1]; 376 377 listener(1); 378 listener(2); 379 talkback!(TalkbackKind.Close); 380 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener); 381 }); 382});