1import { describe, it, expect, vi } from 'vitest';
2
3import { Source, Sink, SignalKind, TalkbackKind } from '../types';
4import { push, start } from '../helpers';
5
6import * as sinks from '../sinks';
7import * as sources from '../sources';
8import * as callbag from '../callbag';
9import * as observable from '../observable';
10
11import Observable from 'zen-observable';
12import callbagIterate from 'callbag-iterate';
13import callbagTake from 'callbag-take';
14
15describe('subscribe', () => {
16 it('sends Pull talkback signals every Push signal', () => {
17 let pulls = 0;
18 const fn = vi.fn();
19
20 const source: Source<any> = sink => {
21 sink(
22 start(signal => {
23 if (signal === TalkbackKind.Pull) {
24 if (pulls < 3) {
25 pulls++;
26 sink(push(0));
27 } else {
28 sink(SignalKind.End);
29 expect(pulls).toBe(3);
30 }
31 }
32 })
33 );
34 };
35
36 sinks.subscribe(fn)(source);
37 expect(fn).toHaveBeenCalledTimes(3);
38 expect(pulls).toBe(3);
39 });
40
41 it('cancels when unsubscribe is called', () => {
42 let pulls = 0;
43 let closing = 0;
44
45 const source: Source<any> = sink => {
46 sink(
47 start(signal => {
48 if (signal === TalkbackKind.Pull) {
49 if (!pulls) {
50 pulls++;
51 sink(push(0));
52 }
53 } else {
54 closing++;
55 }
56 })
57 );
58 };
59
60 const sub = sinks.subscribe(() => {})(source);
61 expect(pulls).toBe(1);
62
63 sub.unsubscribe();
64 expect(closing).toBe(1);
65 });
66
67 it('ignores cancellation when the source has already ended', () => {
68 let pulls = 0;
69 let closing = 0;
70
71 const source: Source<any> = sink => {
72 sink(
73 start(signal => {
74 if (signal === TalkbackKind.Pull) {
75 pulls++;
76 sink(SignalKind.End);
77 } else {
78 closing++;
79 }
80 })
81 );
82 };
83
84 const sub = sinks.subscribe(() => {})(source);
85 expect(pulls).toBe(1);
86 sub.unsubscribe();
87 expect(closing).toBe(0);
88 });
89
90 it('ignores Push signals after the source has ended', () => {
91 const fn = vi.fn();
92 const source: Source<any> = sink => {
93 sink(
94 start(signal => {
95 if (signal === TalkbackKind.Pull) {
96 sink(SignalKind.End);
97 sink(push(0));
98 }
99 })
100 );
101 };
102
103 sinks.subscribe(fn)(source);
104 expect(fn).not.toHaveBeenCalled();
105 });
106
107 it('ignores Push signals after cancellation', () => {
108 const fn = vi.fn();
109 const source: Source<any> = sink => {
110 sink(
111 start(signal => {
112 if (signal === TalkbackKind.Close) {
113 sink(push(0));
114 }
115 })
116 );
117 };
118
119 sinks.subscribe(fn)(source).unsubscribe();
120 expect(fn).not.toHaveBeenCalled();
121 });
122});
123
124describe('publish', () => {
125 it('sends Pull talkback signals every Push signal', () => {
126 let pulls = 0;
127 const source: Source<any> = sink => {
128 sink(
129 start(signal => {
130 if (signal === TalkbackKind.Pull) {
131 if (pulls < 3) {
132 pulls++;
133 sink(push(0));
134 } else {
135 sink(SignalKind.End);
136 expect(pulls).toBe(3);
137 }
138 }
139 })
140 );
141 };
142
143 sinks.publish(source);
144 expect(pulls).toBe(3);
145 });
146});
147
148describe('toArray', () => {
149 it('sends Pull talkback signals every Push signal', () => {
150 let pulls = 0;
151 const source: Source<any> = sink => {
152 sink(
153 start(signal => {
154 if (signal === TalkbackKind.Pull) {
155 if (pulls < 3) {
156 pulls++;
157 sink(push(0));
158 } else {
159 sink(SignalKind.End);
160 expect(pulls).toBe(3);
161 }
162 }
163 })
164 );
165 };
166
167 const array = sinks.toArray(source);
168 expect(array).toEqual([0, 0, 0]);
169 expect(pulls).toBe(3);
170 });
171
172 it('sends a Close talkback signal after all synchronous values have been pulled', () => {
173 let pulls = 0;
174 let ending = 0;
175
176 const source: Source<any> = sink => {
177 sink(
178 start(signal => {
179 if (signal === TalkbackKind.Pull) {
180 if (!pulls) {
181 pulls++;
182 sink(push(0));
183 }
184 } else {
185 ending++;
186 }
187 })
188 );
189 };
190
191 const array = sinks.toArray(source);
192 expect(array).toEqual([0]);
193 expect(ending).toBe(1);
194 });
195});
196
197describe('toPromise', () => {
198 it('creates a Promise that resolves on the last value', async () => {
199 let pulls = 0;
200 let sink: Sink<any> | null = null;
201
202 const source: Source<any> = _sink => {
203 sink = _sink;
204 sink(
205 start(signal => {
206 if (signal === TalkbackKind.Pull) pulls++;
207 })
208 );
209 };
210
211 const fn = vi.fn();
212 const promise = sinks.toPromise(source).then(fn);
213
214 expect(pulls).toBe(1);
215 sink!(push(0));
216 expect(pulls).toBe(2);
217 sink!(push(1));
218 sink!(SignalKind.End);
219 expect(fn).not.toHaveBeenCalled();
220
221 await promise;
222 expect(fn).toHaveBeenCalledWith(1);
223 });
224
225 it('creates a Promise for synchronous sources', async () => {
226 const fn = vi.fn();
227 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn);
228 expect(fn).toHaveBeenCalledWith(3);
229 });
230});
231
232describe('toAsyncIterable', () => {
233 it('creates an async iterable mirroring the Wonka source', async () => {
234 let pulls = 0;
235 let sink: Sink<any> | null = null;
236
237 const source: Source<any> = _sink => {
238 sink = _sink;
239 sink(
240 start(signal => {
241 if (signal === TalkbackKind.Pull) pulls++;
242 })
243 );
244 };
245
246 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
247 const next$ = asyncIterator.next();
248
249 sink!(push(0));
250 expect(await next$).toEqual({ value: 0, done: false });
251 expect(pulls).toBe(1);
252
253 sink!(push(1));
254 expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
255 expect(pulls).toBe(2);
256
257 sink!(SignalKind.End);
258 expect(await asyncIterator.next()).toEqual({ done: true });
259 expect(pulls).toBe(2);
260 });
261
262 it('buffers actively pushed values', async () => {
263 let pulls = 0;
264 let sink: Sink<any> | null = null;
265
266 const source: Source<any> = _sink => {
267 sink = _sink;
268 sink(
269 start(signal => {
270 if (signal === TalkbackKind.Pull) pulls++;
271 })
272 );
273 };
274
275 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
276 const next$ = asyncIterator.next();
277
278 sink!(push(0));
279 sink!(push(1));
280 sink!(SignalKind.End);
281
282 expect(pulls).toBe(1);
283 expect(await next$).toEqual({ value: 0, done: false });
284 expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
285 expect(await asyncIterator.next()).toEqual({ done: true });
286 });
287
288 it('asynchronously waits for pulled values', async () => {
289 let pulls = 0;
290 let sink: Sink<any> | null = null;
291
292 const source: Source<any> = _sink => {
293 sink = _sink;
294 sink(
295 start(signal => {
296 if (signal === TalkbackKind.Pull) pulls++;
297 })
298 );
299 };
300
301 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
302 asyncIterator.next();
303 expect(pulls).toBe(1);
304
305 let resolved = false;
306
307 const promise = asyncIterator.next().then(value => {
308 resolved = true;
309 return value;
310 });
311
312 await Promise.resolve();
313 expect(resolved).toBe(false);
314
315 sink!(push(0));
316 sink!(SignalKind.End);
317 expect(await promise).toEqual({ value: 0, done: false });
318 expect(await asyncIterator.next()).toEqual({ done: true });
319 });
320
321 it('supports cancellation via return', async () => {
322 let ended = false;
323 let sink: Sink<any> | null = null;
324
325 const source: Source<any> = _sink => {
326 sink = _sink;
327 sink(
328 start(signal => {
329 if (signal === TalkbackKind.Close) ended = true;
330 })
331 );
332 };
333
334 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
335 const next$ = asyncIterator.next();
336
337 sink!(push(0));
338 expect(await next$).toEqual({ value: 0, done: false });
339 expect(await asyncIterator.return!()).toEqual({ done: true });
340
341 sink!(push(1));
342 expect(await asyncIterator.next()).toEqual({ done: true });
343
344 expect(ended).toBeTruthy();
345 });
346
347 it('supports for-await-of', async () => {
348 let pulls = 0;
349
350 const source: Source<any> = sink => {
351 sink(
352 start(signal => {
353 if (signal === TalkbackKind.Pull) {
354 sink(pulls < 3 ? push(pulls++) : SignalKind.End);
355 }
356 })
357 );
358 };
359
360 const iterable = sinks.toAsyncIterable(source);
361 const values: any[] = [];
362 for await (const value of iterable) {
363 values.push(value);
364 }
365
366 expect(values).toEqual([0, 1, 2]);
367 });
368
369 it('supports for-await-of with early break', async () => {
370 let pulls = 0;
371 let closed = false;
372
373 const source: Source<any> = sink => {
374 sink(
375 start(signal => {
376 if (signal === TalkbackKind.Pull) {
377 sink(pulls < 3 ? push(pulls++) : SignalKind.End);
378 } else {
379 closed = true;
380 }
381 })
382 );
383 };
384
385 const iterable = sinks.toAsyncIterable(source);
386 for await (const value of iterable) {
387 expect(value).toBe(0);
388 break;
389 }
390
391 expect(closed).toBe(true);
392 });
393});
394
395describe('toObservable', () => {
396 it('creates an Observable mirroring the Wonka source', () => {
397 const next = vi.fn();
398 const complete = vi.fn();
399 let pulls = 0;
400 let sink: Sink<any> | null = null;
401
402 const source: Source<any> = _sink => {
403 sink = _sink;
404 sink(
405 start(signal => {
406 if (signal === TalkbackKind.Pull) pulls++;
407 })
408 );
409 };
410
411 Observable.from(observable.toObservable(source) as any).subscribe({
412 next,
413 complete,
414 });
415
416 expect(pulls).toBe(1);
417 sink!(push(0));
418 expect(next).toHaveBeenCalledWith(0);
419 sink!(push(1));
420 expect(next).toHaveBeenCalledWith(1);
421 sink!(SignalKind.End);
422 expect(complete).toHaveBeenCalled();
423 });
424
425 it('forwards cancellations from the Observable as a talkback', () => {
426 let ending = 0;
427 const source: Source<any> = sink =>
428 sink(
429 start(signal => {
430 if (signal === TalkbackKind.Close) ending++;
431 })
432 );
433
434 const sub = Observable.from(observable.toObservable(source) as any).subscribe({});
435
436 expect(ending).toBe(0);
437 sub.unsubscribe();
438 expect(ending).toBe(1);
439 });
440});
441
442describe('toCallbag', () => {
443 it('creates a Callbag mirroring the Wonka source', () => {
444 const fn = vi.fn();
445 let pulls = 0;
446 let sink: Sink<any> | null = null;
447
448 const source: Source<any> = _sink => {
449 sink = _sink;
450 sink(
451 start(signal => {
452 if (signal === TalkbackKind.Pull) pulls++;
453 })
454 );
455 };
456
457 callbagIterate(fn)(callbag.toCallbag(source));
458
459 expect(pulls).toBe(1);
460 sink!(push(0));
461 expect(fn).toHaveBeenCalledWith(0);
462 sink!(push(1));
463 expect(fn).toHaveBeenCalledWith(1);
464 sink!(SignalKind.End);
465 });
466
467 it('forwards cancellations from the Callbag as a talkback', () => {
468 let ending = 0;
469 const fn = vi.fn();
470
471 const source: Source<any> = sink =>
472 sink(
473 start(signal => {
474 if (signal === TalkbackKind.Pull) {
475 sink(push(0));
476 } else {
477 ending++;
478 }
479 })
480 );
481
482 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any));
483
484 expect(fn.mock.calls).toEqual([[0]]);
485 expect(ending).toBe(1);
486 });
487});