1import { describe, it, expect, vi } from 'vitest';
2
3import { Source, Sink, SignalKind, TalkbackKind } from '../types';
4import { push, start } from '../helpers';
5
6import * as sinks from '../sinks';
7import * as sources from '../sources';
8import * as callbag from '../callbag';
9import * as observable from '../observable';
10
11import Observable from 'zen-observable';
12import callbagIterate from 'callbag-iterate';
13import callbagTake from 'callbag-take';
14
15describe('subscribe', () => {
16 it('sends Pull talkback signals every Push signal', () => {
17 let pulls = 0;
18 const fn = vi.fn();
19
20 const source: Source<any> = sink => {
21 sink(
22 start(signal => {
23 if (signal === TalkbackKind.Pull) {
24 if (pulls < 3) {
25 pulls++;
26 sink(push(0));
27 } else {
28 sink(SignalKind.End);
29 expect(pulls).toBe(3);
30 }
31 }
32 })
33 );
34 };
35
36 sinks.subscribe(fn)(source);
37 expect(fn).toHaveBeenCalledTimes(3);
38 expect(pulls).toBe(3);
39 });
40
41 it('cancels when unsubscribe is called', () => {
42 let pulls = 0;
43 let closing = 0;
44
45 const source: Source<any> = sink => {
46 sink(
47 start(signal => {
48 if (signal === TalkbackKind.Pull) {
49 if (!pulls) {
50 pulls++;
51 sink(push(0));
52 }
53 } else {
54 closing++;
55 }
56 })
57 );
58 };
59
60 const sub = sinks.subscribe(() => {})(source);
61 expect(pulls).toBe(1);
62
63 sub.unsubscribe();
64 expect(closing).toBe(1);
65 });
66
67 it('ignores cancellation when the source has already ended', () => {
68 let pulls = 0;
69 let closing = 0;
70
71 const source: Source<any> = sink => {
72 sink(
73 start(signal => {
74 if (signal === TalkbackKind.Pull) {
75 pulls++;
76 sink(SignalKind.End);
77 } else {
78 closing++;
79 }
80 })
81 );
82 };
83
84 const sub = sinks.subscribe(() => {})(source);
85 expect(pulls).toBe(1);
86 sub.unsubscribe();
87 expect(closing).toBe(0);
88 });
89
90 it('ignores Push signals after the source has ended', () => {
91 const fn = vi.fn();
92 const source: Source<any> = sink => {
93 sink(
94 start(signal => {
95 if (signal === TalkbackKind.Pull) {
96 sink(SignalKind.End);
97 sink(push(0));
98 }
99 })
100 );
101 };
102
103 sinks.subscribe(fn)(source);
104 expect(fn).not.toHaveBeenCalled();
105 });
106
107 it('ignores Push signals after cancellation', () => {
108 const fn = vi.fn();
109 const source: Source<any> = sink => {
110 sink(
111 start(signal => {
112 if (signal === TalkbackKind.Close) {
113 sink(push(0));
114 }
115 })
116 );
117 };
118
119 sinks.subscribe(fn)(source).unsubscribe();
120 expect(fn).not.toHaveBeenCalled();
121 });
122});
123
124describe('publish', () => {
125 it('sends Pull talkback signals every Push signal', () => {
126 let pulls = 0;
127 const source: Source<any> = sink => {
128 sink(
129 start(signal => {
130 if (signal === TalkbackKind.Pull) {
131 if (pulls < 3) {
132 pulls++;
133 sink(push(0));
134 } else {
135 sink(SignalKind.End);
136 expect(pulls).toBe(3);
137 }
138 }
139 })
140 );
141 };
142
143 sinks.publish(source);
144 expect(pulls).toBe(3);
145 });
146});
147
148describe('toArray', () => {
149 it('sends Pull talkback signals every Push signal', () => {
150 let pulls = 0;
151 const source: Source<any> = sink => {
152 sink(
153 start(signal => {
154 if (signal === TalkbackKind.Pull) {
155 if (pulls < 3) {
156 pulls++;
157 sink(push(0));
158 } else {
159 sink(SignalKind.End);
160 expect(pulls).toBe(3);
161 }
162 }
163 })
164 );
165 };
166
167 const array = sinks.toArray(source);
168 expect(array).toEqual([0, 0, 0]);
169 expect(pulls).toBe(3);
170 });
171
172 it('sends a Close talkback signal after all synchronous values have been pulled', () => {
173 let pulls = 0;
174 let ending = 0;
175
176 const source: Source<any> = sink => {
177 sink(
178 start(signal => {
179 if (signal === TalkbackKind.Pull) {
180 if (!pulls) {
181 pulls++;
182 sink(push(0));
183 }
184 } else {
185 ending++;
186 }
187 })
188 );
189 };
190
191 const array = sinks.toArray(source);
192 expect(array).toEqual([0]);
193 expect(ending).toBe(1);
194 });
195});
196
197describe('toPromise', () => {
198 it('creates a Promise that resolves on the last value', async () => {
199 let pulls = 0;
200 let sink: Sink<any> | null = null;
201
202 const source: Source<any> = _sink => {
203 sink = _sink;
204 sink(
205 start(signal => {
206 if (signal === TalkbackKind.Pull) pulls++;
207 })
208 );
209 };
210
211 const fn = vi.fn();
212 const promise = sinks.toPromise(source).then(fn);
213
214 expect(pulls).toBe(1);
215 sink!(push(0));
216 expect(pulls).toBe(2);
217 sink!(push(1));
218 sink!(SignalKind.End);
219 expect(fn).not.toHaveBeenCalled();
220
221 await promise;
222 expect(fn).toHaveBeenCalledWith(1);
223 });
224
225 it('creates a Promise for synchronous sources', async () => {
226 const fn = vi.fn();
227 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn);
228 expect(fn).toHaveBeenCalledWith(3);
229 });
230});
231
232describe('toAsyncIterable', () => {
233 it('creates an async iterable mirroring the Wonka source', async () => {
234 let pulls = 0;
235 let sink: Sink<any> | null = null;
236
237 const source: Source<any> = _sink => {
238 sink = _sink;
239 sink(
240 start(signal => {
241 if (signal === TalkbackKind.Pull) pulls++;
242 })
243 );
244 };
245
246 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
247
248 expect(pulls).toBe(1);
249 sink!(push(0));
250 expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
251 expect(pulls).toBe(2);
252
253 sink!(push(1));
254 expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
255 expect(pulls).toBe(3);
256
257 sink!(SignalKind.End);
258 expect(await asyncIterator.next()).toEqual({ done: true });
259 expect(pulls).toBe(3);
260 });
261
262 it('buffers actively pushed values', async () => {
263 let pulls = 0;
264 let sink: Sink<any> | null = null;
265
266 const source: Source<any> = _sink => {
267 sink = _sink;
268 sink(
269 start(signal => {
270 if (signal === TalkbackKind.Pull) pulls++;
271 })
272 );
273 };
274
275 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
276
277 sink!(push(0));
278 sink!(push(1));
279 sink!(SignalKind.End);
280
281 expect(pulls).toBe(1);
282 expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
283 expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
284 expect(await asyncIterator.next()).toEqual({ done: true });
285 });
286
287 it('asynchronously waits for pulled values', async () => {
288 let pulls = 0;
289 let sink: Sink<any> | null = null;
290
291 const source: Source<any> = _sink => {
292 sink = _sink;
293 sink(
294 start(signal => {
295 if (signal === TalkbackKind.Pull) pulls++;
296 })
297 );
298 };
299
300 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
301 expect(pulls).toBe(1);
302
303 let resolved = false;
304
305 const promise = asyncIterator.next().then(value => {
306 resolved = true;
307 return value;
308 });
309
310 await Promise.resolve();
311 expect(resolved).toBe(false);
312
313 sink!(push(0));
314 sink!(SignalKind.End);
315 expect(await promise).toEqual({ value: 0, done: false });
316 expect(await asyncIterator.next()).toEqual({ done: true });
317 });
318
319 it('supports cancellation via return', async () => {
320 let ended = false;
321 let sink: Sink<any> | null = null;
322
323 const source: Source<any> = _sink => {
324 sink = _sink;
325 sink(
326 start(signal => {
327 if (signal === TalkbackKind.Close) ended = true;
328 })
329 );
330 };
331
332 const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
333
334 sink!(push(0));
335 expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
336 expect(await asyncIterator.return!()).toEqual({ done: true });
337
338 sink!(push(1));
339 expect(await asyncIterator.next()).toEqual({ done: true });
340
341 expect(ended).toBeTruthy();
342 });
343
344 it('supports for-await-of', async () => {
345 let pulls = 0;
346
347 const source: Source<any> = sink => {
348 sink(
349 start(signal => {
350 if (signal === TalkbackKind.Pull) {
351 sink(pulls < 3 ? push(pulls++) : SignalKind.End);
352 }
353 })
354 );
355 };
356
357 const iterable = sinks.toAsyncIterable(source);
358 const values: any[] = [];
359 for await (const value of iterable) {
360 values.push(value);
361 }
362
363 expect(values).toEqual([0, 1, 2]);
364 });
365
366 it('supports for-await-of with early break', async () => {
367 let pulls = 0;
368 let closed = false;
369
370 const source: Source<any> = sink => {
371 sink(
372 start(signal => {
373 if (signal === TalkbackKind.Pull) {
374 sink(pulls < 3 ? push(pulls++) : SignalKind.End);
375 } else {
376 closed = true;
377 }
378 })
379 );
380 };
381
382 const iterable = sinks.toAsyncIterable(source);
383 for await (const value of iterable) {
384 expect(value).toBe(0);
385 break;
386 }
387
388 expect(closed).toBe(true);
389 });
390});
391
392describe('toObservable', () => {
393 it('creates an Observable mirroring the Wonka source', () => {
394 const next = vi.fn();
395 const complete = vi.fn();
396 let pulls = 0;
397 let sink: Sink<any> | null = null;
398
399 const source: Source<any> = _sink => {
400 sink = _sink;
401 sink(
402 start(signal => {
403 if (signal === TalkbackKind.Pull) pulls++;
404 })
405 );
406 };
407
408 Observable.from(observable.toObservable(source) as any).subscribe({
409 next,
410 complete,
411 });
412
413 expect(pulls).toBe(1);
414 sink!(push(0));
415 expect(next).toHaveBeenCalledWith(0);
416 sink!(push(1));
417 expect(next).toHaveBeenCalledWith(1);
418 sink!(SignalKind.End);
419 expect(complete).toHaveBeenCalled();
420 });
421
422 it('forwards cancellations from the Observable as a talkback', () => {
423 let ending = 0;
424 const source: Source<any> = sink =>
425 sink(
426 start(signal => {
427 if (signal === TalkbackKind.Close) ending++;
428 })
429 );
430
431 const sub = Observable.from(observable.toObservable(source) as any).subscribe({});
432
433 expect(ending).toBe(0);
434 sub.unsubscribe();
435 expect(ending).toBe(1);
436 });
437});
438
439describe('toCallbag', () => {
440 it('creates a Callbag mirroring the Wonka source', () => {
441 const fn = vi.fn();
442 let pulls = 0;
443 let sink: Sink<any> | null = null;
444
445 const source: Source<any> = _sink => {
446 sink = _sink;
447 sink(
448 start(signal => {
449 if (signal === TalkbackKind.Pull) pulls++;
450 })
451 );
452 };
453
454 callbagIterate(fn)(callbag.toCallbag(source));
455
456 expect(pulls).toBe(1);
457 sink!(push(0));
458 expect(fn).toHaveBeenCalledWith(0);
459 sink!(push(1));
460 expect(fn).toHaveBeenCalledWith(1);
461 sink!(SignalKind.End);
462 });
463
464 it('forwards cancellations from the Callbag as a talkback', () => {
465 let ending = 0;
466 const fn = vi.fn();
467
468 const source: Source<any> = sink =>
469 sink(
470 start(signal => {
471 if (signal === TalkbackKind.Pull) {
472 sink(push(0));
473 } else {
474 ending++;
475 }
476 })
477 );
478
479 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any));
480
481 expect(fn.mock.calls).toEqual([[0]]);
482 expect(ending).toBe(1);
483 });
484});