1import { Source, Sink, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types';
2import { push, start, talkbackPlaceholder } from '../helpers';
3
4import * as sources from '../sources';
5import * as operators from '../operators';
6import * as callbag from '../callbag';
7import * as observable from '../observable';
8
9import callbagFromArray from 'callbag-from-iter';
10import Observable from 'zen-observable';
11
12const collectSignals = (source: Source<any>, onStart?: (talkbackCb: TalkbackFn) => void) => {
13 let talkback = talkbackPlaceholder;
14 const signals: Signal<any>[] = [];
15 source(signal => {
16 signals.push(signal);
17 if (signal === SignalKind.End) {
18 /*noop*/
19 } else if (signal.tag === SignalKind.Start) {
20 talkback = signal[0];
21 if (onStart) onStart(talkback);
22 talkback(TalkbackKind.Pull);
23 } else {
24 talkback(TalkbackKind.Pull);
25 }
26 });
27
28 return signals;
29};
30
31/* When a Close talkback signal is sent the source should immediately end */
32const passesActiveClose = (source: Source<any>) => {
33 it('stops emitting when a Close talkback signal is received (spec)', () => {
34 let talkback: TalkbackFn | null = null;
35 const sink: Sink<any> = signal => {
36 expect(signal).not.toBe(SignalKind.End);
37 expect((signal as any).tag).not.toBe(SignalKind.Push);
38 if ((signal as any).tag === SignalKind.Start) {
39 (talkback = signal[0])(TalkbackKind.Close);
40 }
41 };
42 source(sink);
43 expect(talkback).not.toBe(null);
44 });
45};
46
47/* All synchronous, cold sources won't send anything unless a Pull signal
48 has been received. */
49const passesColdPull = (source: Source<any>) => {
50 it('sends nothing when no Pull talkback signal has been sent (spec)', () => {
51 let talkback: TalkbackFn | null = null;
52 let pushes = 0;
53
54 const sink: Sink<any> = signal => {
55 if (signal === SignalKind.End) {
56 /*noop*/
57 } else if (signal.tag === SignalKind.Push) {
58 pushes++;
59 } else {
60 talkback = signal[0];
61 }
62 };
63
64 source(sink);
65 expect(talkback).not.toBe(null);
66 expect(pushes).toBe(0);
67
68 setTimeout(() => {
69 expect(pushes).toBe(0);
70 talkback!(TalkbackKind.Pull);
71 }, 10);
72
73 jest.runAllTimers();
74 expect(pushes).toBe(1);
75 });
76};
77
78/* All synchronous, cold sources need to use trampoline scheduling to avoid
79 recursively sending more and more Push signals which would eventually lead
80 to a call stack overflow when too many values are emitted. */
81const passesTrampoline = (source: Source<any>) => {
82 it('uses trampoline scheduling instead of recursive push signals (spec)', () => {
83 let talkback: TalkbackFn | null = null;
84 let pushes = 0;
85
86 const signals: Signal<any>[] = [];
87 const sink: Sink<any> = signal => {
88 if (signal === SignalKind.End) {
89 signals.push(signal);
90 expect(pushes).toBe(2);
91 } else if (signal.tag === SignalKind.Push) {
92 const lastPushes = ++pushes;
93 signals.push(signal);
94 talkback!(TalkbackKind.Pull);
95 expect(lastPushes).toBe(pushes);
96 } else if (signal.tag === SignalKind.Start) {
97 (talkback = signal[0])(TalkbackKind.Pull);
98 expect(pushes).toBe(2);
99 }
100 };
101
102 source(sink);
103 expect(signals).toEqual([push(1), push(2), SignalKind.End]);
104 });
105};
106
107beforeEach(() => {
108 jest.useFakeTimers();
109});
110
111describe('fromArray', () => {
112 passesTrampoline(sources.fromArray([1, 2]));
113 passesColdPull(sources.fromArray([0]));
114 passesActiveClose(sources.fromArray([0]));
115});
116
117describe('fromValue', () => {
118 passesColdPull(sources.fromValue(0));
119 passesActiveClose(sources.fromValue(0));
120
121 it('sends a single value and ends', () => {
122 expect(collectSignals(sources.fromValue(1))).toEqual([
123 start(expect.any(Function)),
124 push(1),
125 SignalKind.End,
126 ]);
127 });
128});
129
130describe('merge', () => {
131 const source = operators.merge<any>([sources.fromValue(0), sources.empty]);
132
133 passesColdPull(source);
134 passesActiveClose(source);
135
136 it('correctly merges two sources where the second is empty', () => {
137 const source = operators.merge<any>([sources.fromValue(0), sources.empty]);
138
139 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]);
140 });
141
142 it('correctly merges hot sources', () => {
143 const onStart = jest.fn();
144 const source = operators.merge<any>([
145 operators.onStart(onStart)(sources.never),
146 operators.onStart(onStart)(sources.fromArray([1, 2])),
147 ]);
148
149 const signals = collectSignals(source);
150 expect(onStart).toHaveBeenCalledTimes(2);
151
152 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2)]);
153 });
154
155 it('correctly merges asynchronous sources', () => {
156 jest.useFakeTimers();
157
158 const onStart = jest.fn();
159 const source = operators.merge<any>([
160 operators.onStart(onStart)(sources.fromValue(-1)),
161 operators.onStart(onStart)(operators.take(2)(sources.interval(50))),
162 ]);
163
164 const signals = collectSignals(source);
165 jest.advanceTimersByTime(100);
166 expect(onStart).toHaveBeenCalledTimes(2);
167
168 expect(signals).toEqual([
169 start(expect.any(Function)),
170 push(-1),
171 push(0),
172 push(1),
173 SignalKind.End,
174 ]);
175 });
176});
177
178describe('concat', () => {
179 const source = operators.concat<any>([sources.fromValue(0), sources.empty]);
180
181 passesColdPull(source);
182 passesActiveClose(source);
183
184 it('correctly concats two sources where the second is empty', () => {
185 const source = operators.concat<any>([sources.fromValue(0), sources.empty]);
186
187 expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]);
188 });
189});
190
191describe('make', () => {
192 it('may be used to create async sources', () => {
193 const teardown = jest.fn();
194 const source = sources.make(observer => {
195 setTimeout(() => observer.next(1), 10);
196 setTimeout(() => observer.complete(), 20);
197 return teardown;
198 });
199
200 const signals = collectSignals(source);
201 expect(signals).toEqual([start(expect.any(Function))]);
202 jest.runAllTimers();
203
204 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
205 });
206
207 it('supports active cancellation', () => {
208 const teardown = jest.fn();
209 const source = sources.make(() => teardown);
210
211 const sink: Sink<any> = signal => {
212 expect(signal).not.toBe(SignalKind.End);
213 expect((signal as any).tag).not.toBe(SignalKind.Push);
214 setTimeout(() => signal[0](TalkbackKind.Close));
215 };
216
217 source(sink);
218 expect(teardown).not.toHaveBeenCalled();
219 jest.runAllTimers();
220 expect(teardown).toHaveBeenCalled();
221 });
222});
223
224describe('makeSubject', () => {
225 it('may be used to emit signals programmatically', () => {
226 const { source, next, complete } = sources.makeSubject();
227 const signals = collectSignals(source);
228
229 expect(signals).toEqual([start(expect.any(Function))]);
230
231 next(1);
232
233 expect(signals).toEqual([start(expect.any(Function)), push(1)]);
234
235 complete();
236
237 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
238 });
239
240 it('ignores signals after complete has been called', () => {
241 const { source, next, complete } = sources.makeSubject();
242 const signals = collectSignals(source);
243 complete();
244
245 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]);
246
247 next(1);
248 complete();
249 expect(signals.length).toBe(2);
250 });
251});
252
253describe('never', () => {
254 it('emits nothing and ends immediately', () => {
255 const signals = collectSignals(sources.never);
256 expect(signals).toEqual([start(expect.any(Function))]);
257 });
258});
259
260describe('empty', () => {
261 it('emits nothing and ends immediately', () => {
262 const signals = collectSignals(sources.empty);
263
264 expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]);
265 });
266});
267
268describe('fromPromise', () => {
269 passesActiveClose(sources.fromPromise(Promise.resolve(null)));
270
271 it('emits a value when the promise resolves', async () => {
272 const promise = Promise.resolve(1);
273 const signals = collectSignals(sources.fromPromise(promise));
274
275 expect(signals).toEqual([start(expect.any(Function))]);
276
277 await Promise.resolve();
278 await promise;
279 await Promise.resolve();
280
281 expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
282 });
283});
284
285describe('fromObservable', () => {
286 beforeEach(() => {
287 jest.useRealTimers();
288 });
289
290 it('converts an Observable to a Wonka source', async () => {
291 const source = observable.fromObservable(Observable.from([1, 2]));
292 const signals = collectSignals(source);
293
294 await new Promise(resolve => setTimeout(resolve));
295
296 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]);
297 });
298
299 it('supports cancellation on converted Observables', async () => {
300 const source = observable.fromObservable(Observable.from([1, 2]));
301 const signals = collectSignals(source, talkback => {
302 talkback(TalkbackKind.Close);
303 });
304
305 await new Promise(resolve => setTimeout(resolve));
306
307 expect(signals).toEqual([start(expect.any(Function))]);
308 });
309});
310
311describe('fromCallbag', () => {
312 it('converts a Callbag to a Wonka source', () => {
313 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any);
314 const signals = collectSignals(source);
315
316 expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]);
317 });
318
319 it('supports cancellation on converted Observables', () => {
320 const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any);
321 const signals = collectSignals(source, talkback => {
322 talkback(TalkbackKind.Close);
323 });
324
325 expect(signals).toEqual([start(expect.any(Function))]);
326 });
327});
328
329describe('interval', () => {
330 it('emits Push signals until Cancel is sent', () => {
331 let pushes = 0;
332 let talkback: TalkbackFn | null = null;
333
334 const sink: Sink<any> = signal => {
335 if (signal === SignalKind.End) {
336 /*noop*/
337 } else if (signal.tag === SignalKind.Push) {
338 pushes++;
339 } else {
340 talkback = signal[0];
341 }
342 };
343
344 sources.interval(100)(sink);
345 expect(talkback).not.toBe(null);
346 expect(pushes).toBe(0);
347
348 jest.advanceTimersByTime(100);
349 expect(pushes).toBe(1);
350 jest.advanceTimersByTime(100);
351 expect(pushes).toBe(2);
352
353 talkback!(TalkbackKind.Close);
354 jest.advanceTimersByTime(100);
355 expect(pushes).toBe(2);
356 });
357});
358
359describe('fromDomEvent', () => {
360 it('emits Push signals for events on a DOM element', () => {
361 let talkback: TalkbackFn | null = null;
362
363 const element = {
364 addEventListener: jest.fn(),
365 removeEventListener: jest.fn(),
366 };
367
368 const sink: Sink<any> = signal => {
369 expect(signal).not.toBe(SignalKind.End);
370 if ((signal as any).tag === SignalKind.Start) talkback = signal[0];
371 };
372
373 sources.fromDomEvent(element as any, 'click')(sink);
374
375 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function));
376 expect(element.removeEventListener).not.toHaveBeenCalled();
377 const listener = element.addEventListener.mock.calls[0][1];
378
379 listener(1);
380 listener(2);
381 talkback!(TalkbackKind.Close);
382 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener);
383 });
384});