Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.2.0 12 kB view raw
1import { describe, it, expect, vi } from 'vitest'; 2 3import { Source, Sink, SignalKind, TalkbackKind } from '../types'; 4import { push, start } from '../helpers'; 5 6import * as sinks from '../sinks'; 7import * as sources from '../sources'; 8import * as callbag from '../callbag'; 9import * as observable from '../observable'; 10 11import Observable from 'zen-observable'; 12import callbagIterate from 'callbag-iterate'; 13import callbagTake from 'callbag-take'; 14 15describe('subscribe', () => { 16 it('sends Pull talkback signals every Push signal', () => { 17 let pulls = 0; 18 const fn = vi.fn(); 19 20 const source: Source<any> = sink => { 21 sink( 22 start(signal => { 23 if (signal === TalkbackKind.Pull) { 24 if (pulls < 3) { 25 pulls++; 26 sink(push(0)); 27 } else { 28 sink(SignalKind.End); 29 expect(pulls).toBe(3); 30 } 31 } 32 }) 33 ); 34 }; 35 36 sinks.subscribe(fn)(source); 37 expect(fn).toHaveBeenCalledTimes(3); 38 expect(pulls).toBe(3); 39 }); 40 41 it('cancels when unsubscribe is called', () => { 42 let pulls = 0; 43 let closing = 0; 44 45 const source: Source<any> = sink => { 46 sink( 47 start(signal => { 48 if (signal === TalkbackKind.Pull) { 49 if (!pulls) { 50 pulls++; 51 sink(push(0)); 52 } 53 } else { 54 closing++; 55 } 56 }) 57 ); 58 }; 59 60 const sub = sinks.subscribe(() => {})(source); 61 expect(pulls).toBe(1); 62 63 sub.unsubscribe(); 64 expect(closing).toBe(1); 65 }); 66 67 it('ignores cancellation when the source has already ended', () => { 68 let pulls = 0; 69 let closing = 0; 70 71 const source: Source<any> = sink => { 72 sink( 73 start(signal => { 74 if (signal === TalkbackKind.Pull) { 75 pulls++; 76 sink(SignalKind.End); 77 } else { 78 closing++; 79 } 80 }) 81 ); 82 }; 83 84 const sub = sinks.subscribe(() => {})(source); 85 expect(pulls).toBe(1); 86 sub.unsubscribe(); 87 expect(closing).toBe(0); 88 }); 89 90 it('ignores Push signals after the source has ended', () => { 91 const fn = vi.fn(); 92 const source: Source<any> = sink => { 93 sink( 94 start(signal => { 95 if (signal === TalkbackKind.Pull) { 96 sink(SignalKind.End); 97 sink(push(0)); 98 } 99 }) 100 ); 101 }; 102 103 sinks.subscribe(fn)(source); 104 expect(fn).not.toHaveBeenCalled(); 105 }); 106 107 it('ignores Push signals after cancellation', () => { 108 const fn = vi.fn(); 109 const source: Source<any> = sink => { 110 sink( 111 start(signal => { 112 if (signal === TalkbackKind.Close) { 113 sink(push(0)); 114 } 115 }) 116 ); 117 }; 118 119 sinks.subscribe(fn)(source).unsubscribe(); 120 expect(fn).not.toHaveBeenCalled(); 121 }); 122}); 123 124describe('publish', () => { 125 it('sends Pull talkback signals every Push signal', () => { 126 let pulls = 0; 127 const source: Source<any> = sink => { 128 sink( 129 start(signal => { 130 if (signal === TalkbackKind.Pull) { 131 if (pulls < 3) { 132 pulls++; 133 sink(push(0)); 134 } else { 135 sink(SignalKind.End); 136 expect(pulls).toBe(3); 137 } 138 } 139 }) 140 ); 141 }; 142 143 sinks.publish(source); 144 expect(pulls).toBe(3); 145 }); 146}); 147 148describe('toArray', () => { 149 it('sends Pull talkback signals every Push signal', () => { 150 let pulls = 0; 151 const source: Source<any> = sink => { 152 sink( 153 start(signal => { 154 if (signal === TalkbackKind.Pull) { 155 if (pulls < 3) { 156 pulls++; 157 sink(push(0)); 158 } else { 159 sink(SignalKind.End); 160 expect(pulls).toBe(3); 161 } 162 } 163 }) 164 ); 165 }; 166 167 const array = sinks.toArray(source); 168 expect(array).toEqual([0, 0, 0]); 169 expect(pulls).toBe(3); 170 }); 171 172 it('sends a Close talkback signal after all synchronous values have been pulled', () => { 173 let pulls = 0; 174 let ending = 0; 175 176 const source: Source<any> = sink => { 177 sink( 178 start(signal => { 179 if (signal === TalkbackKind.Pull) { 180 if (!pulls) { 181 pulls++; 182 sink(push(0)); 183 } 184 } else { 185 ending++; 186 } 187 }) 188 ); 189 }; 190 191 const array = sinks.toArray(source); 192 expect(array).toEqual([0]); 193 expect(ending).toBe(1); 194 }); 195}); 196 197describe('toPromise', () => { 198 it('creates a Promise that resolves on the last value', async () => { 199 let pulls = 0; 200 let sink: Sink<any> | null = null; 201 202 const source: Source<any> = _sink => { 203 sink = _sink; 204 sink( 205 start(signal => { 206 if (signal === TalkbackKind.Pull) pulls++; 207 }) 208 ); 209 }; 210 211 const fn = vi.fn(); 212 const promise = sinks.toPromise(source).then(fn); 213 214 expect(pulls).toBe(1); 215 sink!(push(0)); 216 expect(pulls).toBe(2); 217 sink!(push(1)); 218 sink!(SignalKind.End); 219 expect(fn).not.toHaveBeenCalled(); 220 221 await promise; 222 expect(fn).toHaveBeenCalledWith(1); 223 }); 224 225 it('creates a Promise for synchronous sources', async () => { 226 const fn = vi.fn(); 227 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn); 228 expect(fn).toHaveBeenCalledWith(3); 229 }); 230}); 231 232describe('toAsyncIterable', () => { 233 it('creates an async iterable mirroring the Wonka source', async () => { 234 let pulls = 0; 235 let sink: Sink<any> | null = null; 236 237 const source: Source<any> = _sink => { 238 sink = _sink; 239 sink( 240 start(signal => { 241 if (signal === TalkbackKind.Pull) pulls++; 242 }) 243 ); 244 }; 245 246 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 247 248 expect(pulls).toBe(1); 249 sink!(push(0)); 250 expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); 251 expect(pulls).toBe(2); 252 253 sink!(push(1)); 254 expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); 255 expect(pulls).toBe(3); 256 257 sink!(SignalKind.End); 258 expect(await asyncIterator.next()).toEqual({ done: true }); 259 expect(pulls).toBe(3); 260 }); 261 262 it('buffers actively pushed values', async () => { 263 let pulls = 0; 264 let sink: Sink<any> | null = null; 265 266 const source: Source<any> = _sink => { 267 sink = _sink; 268 sink( 269 start(signal => { 270 if (signal === TalkbackKind.Pull) pulls++; 271 }) 272 ); 273 }; 274 275 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 276 277 sink!(push(0)); 278 sink!(push(1)); 279 sink!(SignalKind.End); 280 281 expect(pulls).toBe(1); 282 expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); 283 expect(await asyncIterator.next()).toEqual({ value: 1, done: false }); 284 expect(await asyncIterator.next()).toEqual({ done: true }); 285 }); 286 287 it('asynchronously waits for pulled values', async () => { 288 let pulls = 0; 289 let sink: Sink<any> | null = null; 290 291 const source: Source<any> = _sink => { 292 sink = _sink; 293 sink( 294 start(signal => { 295 if (signal === TalkbackKind.Pull) pulls++; 296 }) 297 ); 298 }; 299 300 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 301 expect(pulls).toBe(1); 302 303 let resolved = false; 304 305 const promise = asyncIterator.next().then(value => { 306 resolved = true; 307 return value; 308 }); 309 310 await Promise.resolve(); 311 expect(resolved).toBe(false); 312 313 sink!(push(0)); 314 sink!(SignalKind.End); 315 expect(await promise).toEqual({ value: 0, done: false }); 316 expect(await asyncIterator.next()).toEqual({ done: true }); 317 }); 318 319 it('supports cancellation via return', async () => { 320 let ended = false; 321 let sink: Sink<any> | null = null; 322 323 const source: Source<any> = _sink => { 324 sink = _sink; 325 sink( 326 start(signal => { 327 if (signal === TalkbackKind.Close) ended = true; 328 }) 329 ); 330 }; 331 332 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator](); 333 334 sink!(push(0)); 335 expect(await asyncIterator.next()).toEqual({ value: 0, done: false }); 336 expect(await asyncIterator.return!()).toEqual({ done: true }); 337 338 sink!(push(1)); 339 expect(await asyncIterator.next()).toEqual({ done: true }); 340 341 expect(ended).toBeTruthy(); 342 }); 343 344 it('supports for-await-of', async () => { 345 let pulls = 0; 346 347 const source: Source<any> = sink => { 348 sink( 349 start(signal => { 350 if (signal === TalkbackKind.Pull) { 351 sink(pulls < 3 ? push(pulls++) : SignalKind.End); 352 } 353 }) 354 ); 355 }; 356 357 const iterable = sinks.toAsyncIterable(source); 358 const values: any[] = []; 359 for await (const value of iterable) { 360 values.push(value); 361 } 362 363 expect(values).toEqual([0, 1, 2]); 364 }); 365 366 it('supports for-await-of with early break', async () => { 367 let pulls = 0; 368 let closed = false; 369 370 const source: Source<any> = sink => { 371 sink( 372 start(signal => { 373 if (signal === TalkbackKind.Pull) { 374 sink(pulls < 3 ? push(pulls++) : SignalKind.End); 375 } else { 376 closed = true; 377 } 378 }) 379 ); 380 }; 381 382 const iterable = sinks.toAsyncIterable(source); 383 for await (const value of iterable) { 384 expect(value).toBe(0); 385 break; 386 } 387 388 expect(closed).toBe(true); 389 }); 390}); 391 392describe('toObservable', () => { 393 it('creates an Observable mirroring the Wonka source', () => { 394 const next = vi.fn(); 395 const complete = vi.fn(); 396 let pulls = 0; 397 let sink: Sink<any> | null = null; 398 399 const source: Source<any> = _sink => { 400 sink = _sink; 401 sink( 402 start(signal => { 403 if (signal === TalkbackKind.Pull) pulls++; 404 }) 405 ); 406 }; 407 408 Observable.from(observable.toObservable(source) as any).subscribe({ 409 next, 410 complete, 411 }); 412 413 expect(pulls).toBe(1); 414 sink!(push(0)); 415 expect(next).toHaveBeenCalledWith(0); 416 sink!(push(1)); 417 expect(next).toHaveBeenCalledWith(1); 418 sink!(SignalKind.End); 419 expect(complete).toHaveBeenCalled(); 420 }); 421 422 it('forwards cancellations from the Observable as a talkback', () => { 423 let ending = 0; 424 const source: Source<any> = sink => 425 sink( 426 start(signal => { 427 if (signal === TalkbackKind.Close) ending++; 428 }) 429 ); 430 431 const sub = Observable.from(observable.toObservable(source) as any).subscribe({}); 432 433 expect(ending).toBe(0); 434 sub.unsubscribe(); 435 expect(ending).toBe(1); 436 }); 437}); 438 439describe('toCallbag', () => { 440 it('creates a Callbag mirroring the Wonka source', () => { 441 const fn = vi.fn(); 442 let pulls = 0; 443 let sink: Sink<any> | null = null; 444 445 const source: Source<any> = _sink => { 446 sink = _sink; 447 sink( 448 start(signal => { 449 if (signal === TalkbackKind.Pull) pulls++; 450 }) 451 ); 452 }; 453 454 callbagIterate(fn)(callbag.toCallbag(source)); 455 456 expect(pulls).toBe(1); 457 sink!(push(0)); 458 expect(fn).toHaveBeenCalledWith(0); 459 sink!(push(1)); 460 expect(fn).toHaveBeenCalledWith(1); 461 sink!(SignalKind.End); 462 }); 463 464 it('forwards cancellations from the Callbag as a talkback', () => { 465 let ending = 0; 466 const fn = vi.fn(); 467 468 const source: Source<any> = sink => 469 sink( 470 start(signal => { 471 if (signal === TalkbackKind.Pull) { 472 sink(push(0)); 473 } else { 474 ending++; 475 } 476 }) 477 ); 478 479 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any)); 480 481 expect(fn.mock.calls).toEqual([[0]]); 482 expect(ending).toBe(1); 483 }); 484});