1import { describe, it, expect, beforeEach, vi } from 'vitest';
2
3import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
4import { push, start, talkbackPlaceholder } from '../helpers';
5
6import * as sources from '../sources';
7import * as operators from '../operators';
8import * as callbag from '../callbag';
9import * as observable from '../observable';
10
11import callbagFromArray from 'callbag-from-iter';
12import Observable from 'zen-observable';
13
14const collectSignals = (source: Source<any>, onStart?: (talkbackCb: TalkbackFn) => void) => {
15 let talkback = talkbackPlaceholder;
16 const signals: Signal<any>[] = [];
17 source(signal => {
18 signals.push(signal);
19 if (signal === SignalKind.End) {
20 /*noop*/
21 } else if (signal.tag === SignalKind.Start) {
22 talkback = signal[0];
23 if (onStart) onStart(talkback);
24 talkback(TalkbackKind.Pull);
25 } else {
26 talkback(TalkbackKind.Pull);
27 }
28 });
29
30 return signals;
31};
32
33/* When a Close talkback signal is sent the source should immediately end */
34const passesActiveClose = (source: Source<any>) => {
35 it('stops emitting when a Close talkback signal is received (spec)', () => {
36 let talkback: TalkbackFn | null = null;
37 const sink: Sink<any> = signal => {
38 expect(signal).not.toBe(SignalKind.End);
39 expect((signal as any).tag).not.toBe(SignalKind.Push);
40 if ((signal as any).tag === SignalKind.Start) {
41 (talkback = signal[0])(TalkbackKind.Close);
42 }
43 };
44 source(sink);
45 expect(talkback).not.toBe(null);
46 });
47};
48
49/* All synchronous, cold sources won't send anything unless a Pull signal
50 has been received. */
51const passesColdPull = (source: Source<any>) => {
52 it('sends nothing when no Pull talkback signal has been sent (spec)', () => {
53 let talkback: TalkbackFn | null = null;
54 let pushes = 0;
55
56 const sink: Sink<any> = signal => {
57 if (signal === SignalKind.End) {
58 /*noop*/
59 } else if (signal.tag === SignalKind.Push) {
60 pushes++;
61 } else {
62 talkback = signal[0];
63 }
64 };
65
66 source(sink);
67 expect(talkback).not.toBe(null);
68 expect(pushes).toBe(0);
69
70 setTimeout(() => {
71 expect(pushes).toBe(0);
72 talkback!(TalkbackKind.Pull);
73 }, 10);
74
75 vi.runAllTimers();
76 expect(pushes).toBe(1);
77 });
78};
79
80/* All synchronous, cold sources need to use trampoline scheduling to avoid
81 recursively sending more and more Push signals which would eventually lead
82 to a call stack overflow when too many values are emitted. */
83const passesTrampoline = (source: Source<any>) => {
84 it('uses trampoline scheduling instead of recursive push signals (spec)', () => {
85 let talkback: TalkbackFn | null = null;
86 let pushes = 0;
87
88 const signals: Signal<any>[] = [];
89 const sink: Sink<any> = signal => {
90 if (signal === SignalKind.End) {
91 signals.push(signal);
92 expect(pushes).toBe(2);
93 } else if (signal.tag === SignalKind.Push) {
94 const lastPushes = ++pushes;
95 signals.push(signal);
96 talkback!(TalkbackKind.Pull);
97 expect(lastPushes).toBe(pushes);
98 } else if (signal.tag === SignalKind.Start) {
99 (talkback = signal[0])(TalkbackKind.Pull);
100 expect(pushes).toBe(2);
101 }
102 };
103
104 source(sink);
105 expect(signals).toEqual([push(1), push(2), SignalKind.End]);
106 });
107};
108
109beforeEach(() => {
110 vi.useFakeTimers();
111});
112
113describe('fromArray', () => {
114 passesTrampoline(sources.fromArray([1, 2]));
115 passesColdPull(sources.fromArray([0]));
116 passesActiveClose(sources.fromArray([0]));
117});
118
119describe('fromValue', () => {
120 passesColdPull(sources.fromValue(0));
121 passesActiveClose(sources.fromValue(0));
122
123 it('sends a single value and ends', () => {
124 expect(collectSignals(sources.fromValue(1))).toEqual([
125 start(expect.any(Function)),
126 push(1),
127 SignalKind.End,
128 ]);
129 });
130});
131
132describe('merge', () => {
133 const source = operators.merge<any>([sources.fromValue(0), sources.empty]);
134
135 passesColdPull(source);
136 passesActiveClose(source);
137
138 it('correctly merges two sources where the second is empty', () => {
139 const source = operators.merge<any>([sources.fromValue(0), sources.empty]);
140
141 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]);
142 });
143
144 it('correctly merges hot sources', () => {
145 const onStart = vi.fn();
146 const source = operators.merge<any>([
147 operators.onStart(onStart)(sources.never),
148 operators.onStart(onStart)(sources.fromArray([1, 2])),
149 ]);
150
151 const signals = collectSignals(source);
152 expect(onStart).toHaveBeenCalledTimes(2);
153
154 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2)]);
155 });
156
157 it('correctly merges asynchronous sources', () => {
158 vi.useFakeTimers();
159
160 const onStart = vi.fn();
161 const source = operators.merge<any>([
162 operators.onStart(onStart)(sources.fromValue(-1)),
163 operators.onStart(onStart)(operators.take(2)(sources.interval(50))),
164 ]);
165
166 const signals = collectSignals(source);
167 vi.advanceTimersByTime(100);
168 expect(onStart).toHaveBeenCalledTimes(2);
169
170 expect(signals).toEqual([
171 start(expect.any(Function)),
172 push(-1),
173 push(0),
174 push(1),
175 SignalKind.End,
176 ]);
177 });
178});
179
180describe('concat', () => {
181 const source = operators.concat<any>([sources.fromValue(0), sources.empty]);
182
183 passesColdPull(source);
184 passesActiveClose(source);
185
186 it('correctly concats two sources where the second is empty', () => {
187 const source = operators.concat<any>([sources.fromValue(0), sources.empty]);
188
189 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]);
190 });
191});
192
193describe('make', () => {
194 it('may be used to create async sources', () => {
195 const teardown = vi.fn();
196 const source = sources.make(observer => {
197 setTimeout(() => observer.next(1), 10);
198 setTimeout(() => observer.complete(), 20);
199 return teardown;
200 });
201
202 const signals = collectSignals(source);
203 expect(signals).toEqual([start(expect.any(Function))]);
204 vi.runAllTimers();
205
206 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
207 });
208
209 it('supports active cancellation', () => {
210 const teardown = vi.fn();
211 const source = sources.make(() => teardown);
212
213 const sink: Sink<any> = signal => {
214 expect(signal).not.toBe(SignalKind.End);
215 expect((signal as any).tag).not.toBe(SignalKind.Push);
216 setTimeout(() => signal[0](TalkbackKind.Close));
217 };
218
219 source(sink);
220 expect(teardown).not.toHaveBeenCalled();
221 vi.runAllTimers();
222 expect(teardown).toHaveBeenCalled();
223 });
224});
225
226describe('makeSubject', () => {
227 it('may be used to emit signals programmatically', () => {
228 const { source, next, complete } = sources.makeSubject();
229 const signals = collectSignals(source);
230
231 expect(signals).toEqual([start(expect.any(Function))]);
232
233 next(1);
234
235 expect(signals).toEqual([start(expect.any(Function)), push(1)]);
236
237 complete();
238
239 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
240 });
241
242 it('ignores signals after complete has been called', () => {
243 const { source, next, complete } = sources.makeSubject();
244 const signals = collectSignals(source);
245 complete();
246
247 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]);
248
249 next(1);
250 complete();
251 expect(signals.length).toBe(2);
252 });
253});
254
255describe('never', () => {
256 it('emits nothing and ends immediately', () => {
257 const signals = collectSignals(sources.never);
258 expect(signals).toEqual([start(expect.any(Function))]);
259 });
260});
261
262describe('empty', () => {
263 it('emits nothing and ends immediately', () => {
264 const signals = collectSignals(sources.empty);
265
266 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]);
267 });
268});
269
270describe('fromPromise', () => {
271 passesActiveClose(sources.fromPromise(Promise.resolve(null)));
272
273 it('emits a value when the promise resolves', async () => {
274 const promise = Promise.resolve(1);
275 const signals = collectSignals(sources.fromPromise(promise));
276
277 expect(signals).toEqual([start(expect.any(Function))]);
278
279 await Promise.resolve();
280 await promise;
281 await Promise.resolve();
282
283 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
284 });
285});
286
287describe('fromObservable', () => {
288 beforeEach(() => {
289 vi.useRealTimers();
290 });
291
292 it('converts an Observable to a Wonka source', async () => {
293 const source = observable.fromObservable(Observable.from([1, 2]));
294 const signals = collectSignals(source);
295
296 await new Promise(resolve => setTimeout(resolve));
297
298 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]);
299 });
300
301 it('supports cancellation on converted Observables', async () => {
302 const source = observable.fromObservable(Observable.from([1, 2]));
303 const signals = collectSignals(source, talkback => {
304 talkback(TalkbackKind.Close);
305 });
306
307 await new Promise(resolve => setTimeout(resolve));
308
309 expect(signals).toEqual([start(expect.any(Function))]);
310 });
311});
312
313describe('fromCallbag', () => {
314 it('converts a Callbag to a Wonka source', () => {
315 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any);
316 const signals = collectSignals(source);
317
318 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]);
319 });
320
321 it('supports cancellation on converted Observables', () => {
322 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any);
323 const signals = collectSignals(source, talkback => {
324 talkback(TalkbackKind.Close);
325 });
326
327 expect(signals).toEqual([start(expect.any(Function))]);
328 });
329});
330
331describe('interval', () => {
332 it('emits Push signals until Cancel is sent', () => {
333 let pushes = 0;
334 let talkback: TalkbackFn | null = null;
335
336 const sink: Sink<any> = signal => {
337 if (signal === SignalKind.End) {
338 /*noop*/
339 } else if (signal.tag === SignalKind.Push) {
340 pushes++;
341 } else {
342 talkback = signal[0];
343 }
344 };
345
346 sources.interval(100)(sink);
347 expect(talkback).not.toBe(null);
348 expect(pushes).toBe(0);
349
350 vi.advanceTimersByTime(100);
351 expect(pushes).toBe(1);
352 vi.advanceTimersByTime(100);
353 expect(pushes).toBe(2);
354
355 talkback!(TalkbackKind.Close);
356 vi.advanceTimersByTime(100);
357 expect(pushes).toBe(2);
358 });
359});
360
361describe('fromDomEvent', () => {
362 it('emits Push signals for events on a DOM element', () => {
363 let talkback: TalkbackFn | null = null;
364
365 const element = {
366 addEventListener: vi.fn(),
367 removeEventListener: vi.fn(),
368 };
369
370 const sink: Sink<any> = signal => {
371 expect(signal).not.toBe(SignalKind.End);
372 if ((signal as any).tag === SignalKind.Start) talkback = signal[0];
373 };
374
375 sources.fromDomEvent(element as any, 'click')(sink);
376
377 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function));
378 expect(element.removeEventListener).not.toHaveBeenCalled();
379 const listener = element.addEventListener.mock.calls[0][1];
380
381 listener(1);
382 listener(2);
383 talkback!(TalkbackKind.Close);
384 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener);
385 });
386});