Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at main 12 kB view raw
1import { describe, it, expect, vi } from 'vitest'; 2 3import { Source, Sink, SignalKind, TalkbackKind } from '../types'; 4import { push, start } from '../helpers'; 5 6import * as sinks from '../sinks'; 7import * as sources from '../sources'; 8import * as callbag from '../callbag'; 9import * as observable from '../observable'; 10 11import Observable from 'zen-observable'; 12import callbagIterate from 'callbag-iterate'; 13import callbagTake from 'callbag-take'; 14 15describe('subscribe', () => { 16 it('sends Pull talkback signals every Push signal', () => { 17 let pulls = 0; 18 const fn = vi.fn(); 19 20 const source: Source<any> = sink => { 21 sink( 22 start(signal => { 23 if (signal === TalkbackKind.Pull) { 24 if (pulls < 3) { 25 pulls++; 26 sink(push(0)); 27 } else { 28 sink(SignalKind.End); 29 expect(pulls).toBe(3); 30 } 31 } 32 }) 33 ); 34 }; 35 36 sinks.subscribe(fn)(source); 37 expect(fn).toHaveBeenCalledTimes(3); 38 expect(pulls).toBe(3); 39 }); 40 41 it('cancels when unsubscribe is called', () => { 42 let pulls = 0; 43 let closing = 0; 44 45 const source: Source<any> = sink => { 46 sink( 47 start(signal => { 48 if (signal === TalkbackKind.Pull) { 49 if (!pulls) { 50 pulls++; 51 sink(push(0)); 52 } 53 } else { 54 closing++; 55 } 56 }) 57 ); 58 }; 59 60 const sub = sinks.subscribe(() => {})(source); 61 expect(pulls).toBe(1); 62 63 sub.unsubscribe(); 64 expect(closing).toBe(1); 65 }); 66 67 it('ignores cancellation when the source has already ended', () => { 68 let pulls = 0; 69 let closing = 0; 70 71 const source: Source<any> = sink => { 72 sink( 73 start(signal => { 74 if (signal === TalkbackKind.Pull) { 75 pulls++; 76 sink(SignalKind.End); 77 } else { 78 closing++; 79 } 80 }) 81 ); 82 }; 83 84 const sub = sinks.subscribe(() => {})(source); 85 expect(pulls).toBe(1); 86 sub.unsubscribe(); 87 expect(closing).toBe(0); 88 }); 89 90 it('ignores Push signals after the source has ended', () => { 91 const fn = vi.fn(); 92 const source: Source<any> = sink => { 93 sink( 94 start(signal => { 95 if (signal === TalkbackKind.Pull) { 96 sink(SignalKind.End); 97 sink(push(0)); 98 } 99 }) 100 ); 101 }; 102 103 sinks.subscribe(fn)(source); 104 expect(fn).not.toHaveBeenCalled(); 105 }); 106 107 it('ignores Push signals after cancellation', () => { 108 const fn = vi.fn(); 109 const source: Source<any> = sink => { 110 sink( 111 start(signal => { 112 if (signal === TalkbackKind.Close) { 113 sink(push(0)); 114 } 115 }) 116 ); 117 }; 118 119 sinks.subscribe(fn)(source).unsubscribe(); 120 expect(fn).not.toHaveBeenCalled(); 121 }); 122}); 123 124describe('publish', () => { 125 it('sends Pull talkback signals every Push signal', () => { 126 let pulls = 0; 127 const source: Source<any> = sink => { 128 sink( 129 start(signal => { 130 if (signal === TalkbackKind.Pull) { 131 if (pulls < 3) { 132 pulls++; 133 sink(push(0)); 134 } else { 135 sink(SignalKind.End); 136 expect(pulls).toBe(3); 137 } 138 } 139 }) 140 ); 141 }; 142 143 sinks.publish(source); 144 expect(pulls).toBe(3); 145 }); 146}); 147 148describe('toArray', () => { 149 it('sends Pull talkback signals every Push signal', () => { 150 let pulls = 0; 151 const source: Source<any> = sink => { 152 sink( 153 start(signal => { 154 if (signal === TalkbackKind.Pull) { 155 if (pulls < 3) { 156 pulls++; 157 sink(push(0)); 158 } else { 159 sink(SignalKind.End); 160 expect(pulls).toBe(3); 161 } 162 } 163 }) 164 ); 165 }; 166 167 const array = sinks.toArray(source); 168 expect(array).toEqual([0, 0, 0]); 169 expect(pulls).toBe(3); 170 }); 171 172 it('sends a Close talkback signal after all synchronous values have been pulled', () => { 173 let pulls = 0; 174 let ending = 0; 175 176 const source: Source<any> = sink => { 177 sink( 178 start(signal => { 179 if (signal === TalkbackKind.Pull) { 180 if (!pulls) { 181 pulls++; 182 sink(push(0)); 183 } 184 } else { 185 ending++; 186 } 187 }) 188 ); 189 }; 190 191 const array = sinks.toArray(source); 192 expect(array).toEqual([0]); 193 expect(ending).toBe(1); 194 }); 195}); 196 197describe('toPromise', () => { 198 it('creates a Promise that resolves on the last value', async () => { 199 let pulls = 0; 200 let sink: Sink<any> | null = null; 201 202 const source: Source<any> = _sink => { 203 sink = _sink; 204 sink( 205 start(signal => { 206 if (signal === TalkbackKind.Pull) pulls++; 207 }) 208 ); 209 }; 210 211 const fn = vi.fn(); 212 const promise = sinks.toPromise(source).then(fn); 213 214 expect(pulls).toBe(1); 215 sink!(push(0)); 216 expect(pulls).toBe(2); 217 sink!(push(1)); 218 sink!(SignalKind.End); 219 expect(fn).not.toHaveBeenCalled(); 220 221 await promise; 222 expect(fn).toHaveBeenCalledWith(1); 223 }); 224 225 it('creates a Promise for synchronous sources', async () => { 226 const fn = vi.fn(); 227 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn); 228 expect(fn).toHaveBeenCalledWith(3); 229 }); 230}); 231 232describe('toAsyncIterable', () => { 233 it('creates an async iterable mirroring the Wonka source', async () => { 234 let pulls = 0; 235 let sink: Sink<any> | null = null; 236 237 const source: Source<any> = _sink => { 238 sink = _sink; 239 sink( 240 start(signal => { 241 if (signal === TalkbackKind.Pull) pulls++; 242 }) 243 ); 244 }; 245 246 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 247 const next$ = asyncIterator.next(); 248 249 sink!(push(0)); 250 expect(await next$).toEqual({ value: 0, done: false }); 251 expect(pulls).toBe(1); 252 253 sink!(push(1)); 254 expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); 255 expect(pulls).toBe(2); 256 257 sink!(SignalKind.End); 258 expect(await asyncIterator.next()).toEqual({ done: true }); 259 expect(pulls).toBe(2); 260 }); 261 262 it('buffers actively pushed values', async () => { 263 let pulls = 0; 264 let sink: Sink<any> | null = null; 265 266 const source: Source<any> = _sink => { 267 sink = _sink; 268 sink( 269 start(signal => { 270 if (signal === TalkbackKind.Pull) pulls++; 271 }) 272 ); 273 }; 274 275 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 276 const next$ = asyncIterator.next(); 277 278 sink!(push(0)); 279 sink!(push(1)); 280 sink!(SignalKind.End); 281 282 expect(pulls).toBe(1); 283 expect(await next$).toEqual({ value: 0, done: false }); 284 expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); 285 expect(await asyncIterator.next()).toEqual({ done: true }); 286 }); 287 288 it('asynchronously waits for pulled values', async () => { 289 let pulls = 0; 290 let sink: Sink<any> | null = null; 291 292 const source: Source<any> = _sink => { 293 sink = _sink; 294 sink( 295 start(signal => { 296 if (signal === TalkbackKind.Pull) pulls++; 297 }) 298 ); 299 }; 300 301 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 302 asyncIterator.next(); 303 expect(pulls).toBe(1); 304 305 let resolved = false; 306 307 const promise = asyncIterator.next().then(value => { 308 resolved = true; 309 return value; 310 }); 311 312 await Promise.resolve(); 313 expect(resolved).toBe(false); 314 315 sink!(push(0)); 316 sink!(SignalKind.End); 317 expect(await promise).toEqual({ value: 0, done: false }); 318 expect(await asyncIterator.next()).toEqual({ done: true }); 319 }); 320 321 it('supports cancellation via return', async () => { 322 let ended = false; 323 let sink: Sink<any> | null = null; 324 325 const source: Source<any> = _sink => { 326 sink = _sink; 327 sink( 328 start(signal => { 329 if (signal === TalkbackKind.Close) ended = true; 330 }) 331 ); 332 }; 333 334 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 335 const next$ = asyncIterator.next(); 336 337 sink!(push(0)); 338 expect(await next$).toEqual({ value: 0, done: false }); 339 expect(await asyncIterator.return!()).toEqual({ done: true }); 340 341 sink!(push(1)); 342 expect(await asyncIterator.next()).toEqual({ done: true }); 343 344 expect(ended).toBeTruthy(); 345 }); 346 347 it('supports for-await-of', async () => { 348 let pulls = 0; 349 350 const source: Source<any> = sink => { 351 sink( 352 start(signal => { 353 if (signal === TalkbackKind.Pull) { 354 sink(pulls < 3 ? push(pulls++) : SignalKind.End); 355 } 356 }) 357 ); 358 }; 359 360 const iterable = sinks.toAsyncIterable(source); 361 const values: any[] = []; 362 for await (const value of iterable) { 363 values.push(value); 364 } 365 366 expect(values).toEqual([0, 1, 2]); 367 }); 368 369 it('supports for-await-of with early break', async () => { 370 let pulls = 0; 371 let closed = false; 372 373 const source: Source<any> = sink => { 374 sink( 375 start(signal => { 376 if (signal === TalkbackKind.Pull) { 377 sink(pulls < 3 ? push(pulls++) : SignalKind.End); 378 } else { 379 closed = true; 380 } 381 }) 382 ); 383 }; 384 385 const iterable = sinks.toAsyncIterable(source); 386 for await (const value of iterable) { 387 expect(value).toBe(0); 388 break; 389 } 390 391 expect(closed).toBe(true); 392 }); 393}); 394 395describe('toObservable', () => { 396 it('creates an Observable mirroring the Wonka source', () => { 397 const next = vi.fn(); 398 const complete = vi.fn(); 399 let pulls = 0; 400 let sink: Sink<any> | null = null; 401 402 const source: Source<any> = _sink => { 403 sink = _sink; 404 sink( 405 start(signal => { 406 if (signal === TalkbackKind.Pull) pulls++; 407 }) 408 ); 409 }; 410 411 Observable.from(observable.toObservable(source) as any).subscribe({ 412 next, 413 complete, 414 }); 415 416 expect(pulls).toBe(1); 417 sink!(push(0)); 418 expect(next).toHaveBeenCalledWith(0); 419 sink!(push(1)); 420 expect(next).toHaveBeenCalledWith(1); 421 sink!(SignalKind.End); 422 expect(complete).toHaveBeenCalled(); 423 }); 424 425 it('forwards cancellations from the Observable as a talkback', () => { 426 let ending = 0; 427 const source: Source<any> = sink => 428 sink( 429 start(signal => { 430 if (signal === TalkbackKind.Close) ending++; 431 }) 432 ); 433 434 const sub = Observable.from(observable.toObservable(source) as any).subscribe({}); 435 436 expect(ending).toBe(0); 437 sub.unsubscribe(); 438 expect(ending).toBe(1); 439 }); 440}); 441 442describe('toCallbag', () => { 443 it('creates a Callbag mirroring the Wonka source', () => { 444 const fn = vi.fn(); 445 let pulls = 0; 446 let sink: Sink<any> | null = null; 447 448 const source: Source<any> = _sink => { 449 sink = _sink; 450 sink( 451 start(signal => { 452 if (signal === TalkbackKind.Pull) pulls++; 453 }) 454 ); 455 }; 456 457 callbagIterate(fn)(callbag.toCallbag(source)); 458 459 expect(pulls).toBe(1); 460 sink!(push(0)); 461 expect(fn).toHaveBeenCalledWith(0); 462 sink!(push(1)); 463 expect(fn).toHaveBeenCalledWith(1); 464 sink!(SignalKind.End); 465 }); 466 467 it('forwards cancellations from the Callbag as a talkback', () => { 468 let ending = 0; 469 const fn = vi.fn(); 470 471 const source: Source<any> = sink => 472 sink( 473 start(signal => { 474 if (signal === TalkbackKind.Pull) { 475 sink(push(0)); 476 } else { 477 ending++; 478 } 479 }) 480 ); 481 482 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any)); 483 484 expect(fn.mock.calls).toEqual([[0]]); 485 expect(ending).toBe(1); 486 }); 487});