1import * as deriving from './helpers/wonka_deriving';
2import * as sources from './wonka_sources.gen';
3import * as operators from './wonka_operators.gen';
4import * as types from './wonka_types.gen';
5import * as web from './web/wonkaJs.gen';
6
7import callbagFromArray from 'callbag-from-iter';
8import Observable from 'zen-observable';
9
10const collectSignals = (
11 source: types.sourceT<any>,
12 onStart?: (talkbackCb: (tb: types.talkbackT) => void) => void
13) => {
14 let talkback = null;
15 const signals = [];
16
17 source(signal => {
18 signals.push(signal);
19 if (deriving.isStart(signal)) {
20 talkback = deriving.unboxStart(signal);
21 if (onStart) onStart(talkback);
22 talkback(deriving.pull);
23 } else if (deriving.isPush(signal)) {
24 talkback(deriving.pull);
25 }
26 })
27
28 return signals;
29};
30
31/* When a Close talkback signal is sent the source should immediately end */
32const passesActiveClose = (source: types.sourceT<any>) =>
33 it('stops emitting when a Close talkback signal is received (spec)', () => {
34 let talkback = null;
35
36 const sink: types.sinkT<any> = signal => {
37 expect(deriving.isPush(signal)).toBeFalsy();
38 expect(deriving.isEnd(signal)).toBeFalsy();
39 if (deriving.isStart(signal)) {
40 talkback = deriving.unboxStart(signal);
41 talkback(deriving.close);
42 }
43 };
44
45 source(sink);
46 expect(talkback).not.toBe(null);
47 });
48
49/* All synchronous, cold sources won't send anything unless a Pull signal
50 has been received. */
51const passesColdPull = (source: types.sourceT<any>) =>
52 it('sends nothing when no Pull talkback signal has been sent (spec)', () => {
53 let pushes = 0;
54 let talkback = null;
55
56 const sink: types.sinkT<any> = signal => {
57 if (deriving.isPush(signal)) {
58 pushes++;
59 } else if (deriving.isStart(signal)) {
60 talkback = deriving.unboxStart(signal);
61 }
62 };
63
64 source(sink);
65 expect(talkback).not.toBe(null);
66 expect(pushes).toBe(0);
67
68 setTimeout(() => {
69 expect(pushes).toBe(0);
70 talkback(deriving.pull);
71 }, 10);
72
73 jest.runAllTimers();
74 expect(pushes).toBe(1);
75 });
76
77/* All synchronous, cold sources need to use trampoline scheduling to avoid
78 recursively sending more and more Push signals which would eventually lead
79 to a call stack overflow when too many values are emitted. */
80const passesTrampoline = (source: types.sourceT<any>) =>
81 it('uses trampoline scheduling instead of recursive push signals (spec)', () => {
82 let talkback = null;
83 let pushes = 0;
84
85 const signals = [];
86 const sink: types.sinkT<any> = signal => {
87 if (deriving.isPush(signal)) {
88 const lastPushes = ++pushes;
89 signals.push(signal);
90 talkback(deriving.pull);
91 expect(lastPushes).toBe(pushes);
92 } else if (deriving.isStart(signal)) {
93 talkback = deriving.unboxStart(signal);
94 talkback(deriving.pull);
95 expect(pushes).toBe(2);
96 } else if (deriving.isEnd(signal)) {
97 signals.push(signal);
98 expect(pushes).toBe(2);
99 }
100 };
101
102 source(sink);
103
104 expect(signals).toEqual([
105 deriving.push(1),
106 deriving.push(2),
107 deriving.end(),
108 ]);
109 });
110
111beforeEach(() => {
112 jest.useFakeTimers();
113});
114
115describe('fromArray', () => {
116 passesTrampoline(sources.fromArray([1, 2]));
117 passesColdPull(sources.fromArray([0]));
118 passesActiveClose(sources.fromArray([0]));
119});
120
121describe('fromList', () => {
122 passesTrampoline(sources.fromList([1, [2]] as any));
123 passesColdPull(sources.fromList([0] as any));
124 passesActiveClose(sources.fromList([0] as any));
125});
126
127describe('fromValue', () => {
128 passesColdPull(sources.fromValue(0));
129 passesActiveClose(sources.fromValue(0));
130
131 it('sends a single value and ends', () => {
132 expect(collectSignals(sources.fromValue(1))).toEqual([
133 deriving.start(expect.any(Function)),
134 deriving.push(1),
135 deriving.end()
136 ]);
137 });
138});
139
140describe('merge', () => {
141 const source = operators.merge<any>([
142 sources.fromValue(0),
143 sources.empty
144 ]);
145
146 passesColdPull(source);
147 passesActiveClose(source);
148
149 it('correctly merges two sources where the second is empty', () => {
150 const source = operators.merge<any>([
151 sources.fromValue(0),
152 sources.empty
153 ]);
154
155 expect(collectSignals(source)).toEqual([
156 deriving.start(expect.any(Function)),
157 deriving.push(0),
158 deriving.end(),
159 ]);
160 });
161
162 it('correctly merges hot sources', () => {
163 const onStart = jest.fn();
164 const source = operators.merge<any>([
165 operators.onStart(onStart)(sources.never),
166 operators.onStart(onStart)(sources.fromArray([1, 2])),
167 ]);
168
169 const signals = collectSignals(source);
170 expect(onStart).toHaveBeenCalledTimes(2);
171
172 expect(signals).toEqual([
173 deriving.start(expect.any(Function)),
174 deriving.push(1),
175 deriving.push(2),
176 ]);
177 });
178
179 it('correctly merges asynchronous sources', () => {
180 jest.useFakeTimers();
181
182 const onStart = jest.fn();
183 const source = operators.merge<any>([
184 operators.onStart(onStart)(sources.fromValue(-1)),
185 operators.onStart(onStart)(
186 operators.take(2)(web.interval(50))
187 ),
188 ]);
189
190 const signals = collectSignals(source);
191 jest.advanceTimersByTime(100);
192 expect(onStart).toHaveBeenCalledTimes(2);
193
194 expect(signals).toEqual([
195 deriving.start(expect.any(Function)),
196 deriving.push(-1),
197 deriving.push(0),
198 deriving.push(1),
199 deriving.end(),
200 ]);
201 });
202});
203
204describe('concat', () => {
205 const source = operators.concat<any>([
206 sources.fromValue(0),
207 sources.empty
208 ]);
209
210 passesColdPull(source);
211 passesActiveClose(source);
212
213 it('correctly concats two sources where the second is empty', () => {
214 const source = operators.concat<any>([
215 sources.fromValue(0),
216 sources.empty
217 ]);
218
219 expect(collectSignals(source)).toEqual([
220 deriving.start(expect.any(Function)),
221 deriving.push(0),
222 deriving.end(),
223 ]);
224 });
225});
226
227describe('make', () => {
228 it('may be used to create async sources', () => {
229 const teardown = jest.fn();
230 const source = sources.make(observer => {
231 setTimeout(() => observer.next(1), 10);
232 setTimeout(() => observer.complete(), 20);
233 return teardown;
234 });
235
236 const signals = collectSignals(source);
237 expect(signals).toEqual([deriving.start(expect.any(Function))]);
238 jest.runAllTimers();
239
240 expect(signals).toEqual([
241 deriving.start(expect.any(Function)),
242 deriving.push(1),
243 deriving.end(),
244 ]);
245 });
246
247 it('supports active cancellation', () => {
248 const teardown = jest.fn();
249 const source = sources.make(() => teardown);
250
251 const sink: types.sinkT<any> = signal => {
252 expect(deriving.isPush(signal)).toBeFalsy();
253 expect(deriving.isEnd(signal)).toBeFalsy();
254 if (deriving.isStart(signal))
255 setTimeout(() => deriving.unboxStart(signal)(deriving.close));
256 };
257
258 source(sink);
259 expect(teardown).not.toHaveBeenCalled();
260 jest.runAllTimers();
261 expect(teardown).toHaveBeenCalled();
262 });
263});
264
265describe('makeSubject', () => {
266 it('may be used to emit signals programmatically', () => {
267 const { source, next, complete } = sources.makeSubject();
268 const signals = collectSignals(source);
269
270 expect(signals).toEqual([
271 deriving.start(expect.any(Function)),
272 ]);
273
274 next(1);
275
276 expect(signals).toEqual([
277 deriving.start(expect.any(Function)),
278 deriving.push(1),
279 ]);
280
281 complete();
282
283 expect(signals).toEqual([
284 deriving.start(expect.any(Function)),
285 deriving.push(1),
286 deriving.end(),
287 ]);
288 });
289
290 it('ignores signals after complete has been called', () => {
291 const { source, next, complete } = sources.makeSubject();
292 const signals = collectSignals(source);
293 complete();
294
295 expect(signals).toEqual([
296 deriving.start(expect.any(Function)),
297 deriving.end(),
298 ]);
299
300 next(1);
301 complete();
302 expect(signals.length).toBe(2);
303 });
304});
305
306describe('never', () => {
307 it('emits nothing and ends immediately', () => {
308 const signals = collectSignals(sources.never);
309 expect(signals).toEqual([deriving.start(expect.any(Function)) ]);
310 });
311});
312
313describe('empty', () => {
314 it('emits nothing and ends immediately', () => {
315 const signals = collectSignals(sources.empty);
316
317 expect(signals).toEqual([
318 deriving.start(expect.any(Function)),
319 deriving.end(),
320 ]);
321 });
322});
323
324describe('fromPromise', () => {
325 passesActiveClose(web.fromPromise(Promise.resolve(null)));
326
327 it('emits a value when the promise resolves', async () => {
328 const promise = Promise.resolve(1);
329 const signals = collectSignals(web.fromPromise(promise));
330
331 expect(signals).toEqual([
332 deriving.start(expect.any(Function)),
333 ]);
334
335 await promise;
336
337 expect(signals).toEqual([
338 deriving.start(expect.any(Function)),
339 deriving.push(1),
340 deriving.end(),
341 ]);
342 });
343});
344
345describe('fromObservable', () => {
346 beforeEach(() => {
347 jest.useRealTimers();
348 });
349
350 it('converts an Observable to a Wonka source', async () => {
351 const source = web.fromObservable(Observable.from([1, 2]));
352 const signals = collectSignals(source);
353
354 await new Promise(resolve => setTimeout(resolve));
355
356 expect(signals).toEqual([
357 deriving.start(expect.any(Function)),
358 deriving.push(1),
359 deriving.push(2),
360 deriving.end(),
361 ]);
362 });
363
364 it('supports cancellation on converted Observables', async () => {
365 const source = web.fromObservable(Observable.from([1, 2]));
366 const signals = collectSignals(source, talkback => {
367 talkback(deriving.close);
368 });
369
370 await new Promise(resolve => setTimeout(resolve));
371
372 expect(signals).toEqual([
373 deriving.start(expect.any(Function)),
374 ]);
375 });
376});
377
378describe('fromCallbag', () => {
379 it('converts a Callbag to a Wonka source', () => {
380 const source = web.fromCallbag(callbagFromArray([1, 2]));
381 const signals = collectSignals(source);
382
383 expect(signals).toEqual([
384 deriving.start(expect.any(Function)),
385 deriving.push(1),
386 deriving.push(2),
387 deriving.end(),
388 ]);
389 });
390
391 it('supports cancellation on converted Observables', () => {
392 const source = web.fromCallbag(callbagFromArray([1, 2]));
393 const signals = collectSignals(source, talkback => {
394 talkback(deriving.close);
395 });
396
397 expect(signals).toEqual([
398 deriving.start(expect.any(Function)),
399 ]);
400 });
401});
402
403describe('interval', () => {
404 it('emits Push signals until Cancel is sent', () => {
405 let pushes = 0;
406 let talkback = null;
407
408 const sink: types.sinkT<any> = signal => {
409 if (deriving.isPush(signal)) {
410 pushes++;
411 } else if (deriving.isStart(signal)) {
412 talkback = deriving.unboxStart(signal);
413 }
414 };
415
416 web.interval(100)(sink);
417 expect(talkback).not.toBe(null);
418 expect(pushes).toBe(0);
419
420 jest.advanceTimersByTime(100);
421 expect(pushes).toBe(1);
422 jest.advanceTimersByTime(100);
423 expect(pushes).toBe(2);
424
425 talkback(deriving.close);
426 jest.advanceTimersByTime(100);
427 expect(pushes).toBe(2);
428 });
429});
430
431describe('fromDomEvent', () => {
432 it('emits Push signals for events on a DOM element', () => {
433 let talkback = null;
434
435 const element = {
436 addEventListener: jest.fn(),
437 removeEventListener: jest.fn(),
438 };
439
440 const sink: types.sinkT<any> = signal => {
441 expect(deriving.isEnd(signal)).toBeFalsy();
442 if (deriving.isStart(signal))
443 talkback = deriving.unboxStart(signal);
444 };
445
446 web.fromDomEvent(element as any, 'click')(sink);
447
448 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function));
449 expect(element.removeEventListener).not.toHaveBeenCalled();
450 const listener = element.addEventListener.mock.calls[0][1];
451
452 listener(1);
453 listener(2);
454 talkback(deriving.close);
455 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener);
456 });
457});