Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at main 11 kB view raw
1import { describe, it, expect, beforeEach, vi } from 'vitest'; 2 3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 4import { push, start, talkbackPlaceholder } from '../helpers'; 5 6import * as sources from '../sources'; 7import * as operators from '../operators'; 8import * as callbag from '../callbag'; 9import * as observable from '../observable'; 10 11import callbagFromArray from 'callbag-from-iter'; 12import Observable from 'zen-observable'; 13 14const collectSignals = (source: Source<any>, onStart?: (talkbackCb: TalkbackFn) => void) => { 15 let talkback = talkbackPlaceholder; 16 const signals: Signal<any>[] = []; 17 source(signal => { 18 signals.push(signal); 19 if (signal === SignalKind.End) { 20 /*noop*/ 21 } else if (signal.tag === SignalKind.Start) { 22 talkback = signal[0]; 23 if (onStart) onStart(talkback); 24 talkback(TalkbackKind.Pull); 25 } else { 26 talkback(TalkbackKind.Pull); 27 } 28 }); 29 30 return signals; 31}; 32 33/* When a Close talkback signal is sent the source should immediately end */ 34const passesActiveClose = (source: Source<any>) => { 35 it('stops emitting when a Close talkback signal is received (spec)', () => { 36 let talkback: TalkbackFn | null = null; 37 const sink: Sink<any> = signal => { 38 expect(signal).not.toBe(SignalKind.End); 39 expect((signal as any).tag).not.toBe(SignalKind.Push); 40 if ((signal as any).tag === SignalKind.Start) { 41 (talkback = signal[0])(TalkbackKind.Close); 42 } 43 }; 44 source(sink); 45 expect(talkback).not.toBe(null); 46 }); 47}; 48 49/* All synchronous, cold sources won't send anything unless a Pull signal 50 has been received. */ 51const passesColdPull = (source: Source<any>) => { 52 it('sends nothing when no Pull talkback signal has been sent (spec)', () => { 53 let talkback: TalkbackFn | null = null; 54 let pushes = 0; 55 56 const sink: Sink<any> = signal => { 57 if (signal === SignalKind.End) { 58 /*noop*/ 59 } else if (signal.tag === SignalKind.Push) { 60 pushes++; 61 } else { 62 talkback = signal[0]; 63 } 64 }; 65 66 source(sink); 67 expect(talkback).not.toBe(null); 68 expect(pushes).toBe(0); 69 70 setTimeout(() => { 71 expect(pushes).toBe(0); 72 talkback!(TalkbackKind.Pull); 73 }, 10); 74 75 vi.runAllTimers(); 76 expect(pushes).toBe(1); 77 }); 78}; 79 80/* All synchronous, cold sources need to use trampoline scheduling to avoid 81 recursively sending more and more Push signals which would eventually lead 82 to a call stack overflow when too many values are emitted. */ 83const passesTrampoline = (source: Source<any>) => { 84 it('uses trampoline scheduling instead of recursive push signals (spec)', () => { 85 let talkback: TalkbackFn | null = null; 86 let pushes = 0; 87 88 const signals: Signal<any>[] = []; 89 const sink: Sink<any> = signal => { 90 if (signal === SignalKind.End) { 91 signals.push(signal); 92 expect(pushes).toBe(2); 93 } else if (signal.tag === SignalKind.Push) { 94 const lastPushes = ++pushes; 95 signals.push(signal); 96 talkback!(TalkbackKind.Pull); 97 expect(lastPushes).toBe(pushes); 98 } else if (signal.tag === SignalKind.Start) { 99 (talkback = signal[0])(TalkbackKind.Pull); 100 expect(pushes).toBe(2); 101 } 102 }; 103 104 source(sink); 105 expect(signals).toEqual([push(1), push(2), SignalKind.End]); 106 }); 107}; 108 109beforeEach(() => { 110 vi.useFakeTimers(); 111}); 112 113describe('fromArray', () => { 114 passesTrampoline(sources.fromArray([1, 2])); 115 passesColdPull(sources.fromArray([0])); 116 passesActiveClose(sources.fromArray([0])); 117}); 118 119describe('fromValue', () => { 120 passesColdPull(sources.fromValue(0)); 121 passesActiveClose(sources.fromValue(0)); 122 123 it('sends a single value and ends', () => { 124 expect(collectSignals(sources.fromValue(1))).toEqual([ 125 start(expect.any(Function)), 126 push(1), 127 SignalKind.End, 128 ]); 129 }); 130}); 131 132describe('merge', () => { 133 const source = operators.merge<any>([sources.fromValue(0), sources.empty]); 134 135 passesColdPull(source); 136 passesActiveClose(source); 137 138 it('correctly merges two sources where the second is empty', () => { 139 const source = operators.merge<any>([sources.fromValue(0), sources.empty]); 140 141 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]); 142 }); 143 144 it('correctly merges hot sources', () => { 145 const onStart = vi.fn(); 146 const source = operators.merge<any>([ 147 operators.onStart(onStart)(sources.never), 148 operators.onStart(onStart)(sources.fromArray([1, 2])), 149 ]); 150 151 const signals = collectSignals(source); 152 expect(onStart).toHaveBeenCalledTimes(2); 153 154 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2)]); 155 }); 156 157 it('correctly merges asynchronous sources', () => { 158 vi.useFakeTimers(); 159 160 const onStart = vi.fn(); 161 const source = operators.merge<any>([ 162 operators.onStart(onStart)(sources.fromValue(-1)), 163 operators.onStart(onStart)(operators.take(2)(sources.interval(50))), 164 ]); 165 166 const signals = collectSignals(source); 167 vi.advanceTimersByTime(100); 168 expect(onStart).toHaveBeenCalledTimes(2); 169 170 expect(signals).toEqual([ 171 start(expect.any(Function)), 172 push(-1), 173 push(0), 174 push(1), 175 SignalKind.End, 176 ]); 177 }); 178}); 179 180describe('concat', () => { 181 const source = operators.concat<any>([sources.fromValue(0), sources.empty]); 182 183 passesColdPull(source); 184 passesActiveClose(source); 185 186 it('correctly concats two sources where the second is empty', () => { 187 const source = operators.concat<any>([sources.fromValue(0), sources.empty]); 188 189 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]); 190 }); 191}); 192 193describe('make', () => { 194 it('may be used to create async sources', () => { 195 const teardown = vi.fn(); 196 const source = sources.make(observer => { 197 setTimeout(() => observer.next(1), 10); 198 setTimeout(() => observer.complete(), 20); 199 return teardown; 200 }); 201 202 const signals = collectSignals(source); 203 expect(signals).toEqual([start(expect.any(Function))]); 204 vi.runAllTimers(); 205 206 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]); 207 }); 208 209 it('supports active cancellation', () => { 210 const teardown = vi.fn(); 211 const source = sources.make(() => teardown); 212 213 const sink: Sink<any> = signal => { 214 expect(signal).not.toBe(SignalKind.End); 215 expect((signal as any).tag).not.toBe(SignalKind.Push); 216 setTimeout(() => signal[0](TalkbackKind.Close)); 217 }; 218 219 source(sink); 220 expect(teardown).not.toHaveBeenCalled(); 221 vi.runAllTimers(); 222 expect(teardown).toHaveBeenCalled(); 223 }); 224}); 225 226describe('makeSubject', () => { 227 it('may be used to emit signals programmatically', () => { 228 const { source, next, complete } = sources.makeSubject(); 229 const signals = collectSignals(source); 230 231 expect(signals).toEqual([start(expect.any(Function))]); 232 233 next(1); 234 235 expect(signals).toEqual([start(expect.any(Function)), push(1)]); 236 237 complete(); 238 239 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]); 240 }); 241 242 it('ignores signals after complete has been called', () => { 243 const { source, next, complete } = sources.makeSubject(); 244 const signals = collectSignals(source); 245 complete(); 246 247 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]); 248 249 next(1); 250 complete(); 251 expect(signals.length).toBe(2); 252 }); 253}); 254 255describe('never', () => { 256 it('emits nothing and ends immediately', () => { 257 const signals = collectSignals(sources.never); 258 expect(signals).toEqual([start(expect.any(Function))]); 259 }); 260}); 261 262describe('empty', () => { 263 it('emits nothing and ends immediately', () => { 264 const signals = collectSignals(sources.empty); 265 266 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]); 267 }); 268}); 269 270describe('fromPromise', () => { 271 passesActiveClose(sources.fromPromise(Promise.resolve(null))); 272 273 it('emits a value when the promise resolves', async () => { 274 const promise = Promise.resolve(1); 275 const signals = collectSignals(sources.fromPromise(promise)); 276 277 expect(signals).toEqual([start(expect.any(Function))]); 278 279 await Promise.resolve(); 280 await promise; 281 await Promise.resolve(); 282 283 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]); 284 }); 285}); 286 287describe('fromObservable', () => { 288 beforeEach(() => { 289 vi.useRealTimers(); 290 }); 291 292 it('converts an Observable to a Wonka source', async () => { 293 const source = observable.fromObservable(Observable.from([1, 2])); 294 const signals = collectSignals(source); 295 296 await new Promise(resolve => setTimeout(resolve)); 297 298 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]); 299 }); 300 301 it('supports cancellation on converted Observables', async () => { 302 const source = observable.fromObservable(Observable.from([1, 2])); 303 const signals = collectSignals(source, talkback => { 304 talkback(TalkbackKind.Close); 305 }); 306 307 await new Promise(resolve => setTimeout(resolve)); 308 309 expect(signals).toEqual([start(expect.any(Function))]); 310 }); 311}); 312 313describe('fromCallbag', () => { 314 it('converts a Callbag to a Wonka source', () => { 315 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any); 316 const signals = collectSignals(source); 317 318 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]); 319 }); 320 321 it('supports cancellation on converted Observables', () => { 322 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any); 323 const signals = collectSignals(source, talkback => { 324 talkback(TalkbackKind.Close); 325 }); 326 327 expect(signals).toEqual([start(expect.any(Function))]); 328 }); 329}); 330 331describe('interval', () => { 332 it('emits Push signals until Cancel is sent', () => { 333 let pushes = 0; 334 let talkback: TalkbackFn | null = null; 335 336 const sink: Sink<any> = signal => { 337 if (signal === SignalKind.End) { 338 /*noop*/ 339 } else if (signal.tag === SignalKind.Push) { 340 pushes++; 341 } else { 342 talkback = signal[0]; 343 } 344 }; 345 346 sources.interval(100)(sink); 347 expect(talkback).not.toBe(null); 348 expect(pushes).toBe(0); 349 350 vi.advanceTimersByTime(100); 351 expect(pushes).toBe(1); 352 vi.advanceTimersByTime(100); 353 expect(pushes).toBe(2); 354 355 talkback!(TalkbackKind.Close); 356 vi.advanceTimersByTime(100); 357 expect(pushes).toBe(2); 358 }); 359}); 360 361describe('fromDomEvent', () => { 362 it('emits Push signals for events on a DOM element', () => { 363 let talkback: TalkbackFn | null = null; 364 365 const element = { 366 addEventListener: vi.fn(), 367 removeEventListener: vi.fn(), 368 }; 369 370 const sink: Sink<any> = signal => { 371 expect(signal).not.toBe(SignalKind.End); 372 if ((signal as any).tag === SignalKind.Start) talkback = signal[0]; 373 }; 374 375 sources.fromDomEvent(element as any, 'click')(sink); 376 377 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function)); 378 expect(element.removeEventListener).not.toHaveBeenCalled(); 379 const listener = element.addEventListener.mock.calls[0][1]; 380 381 listener(1); 382 listener(2); 383 talkback!(TalkbackKind.Close); 384 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener); 385 }); 386});