1import * as deriving from './helpers/wonka_deriving';
2import * as sources from './wonka_sources.gen';
3import * as types from './wonka_types.gen';
4import * as web from './web/wonkaJs.gen';
5
6import callbagFromArray from 'callbag-from-iter';
7import Observable from 'zen-observable';
8
9const collectSignals = (
10 source: types.sourceT<any>,
11 onStart?: (talkbackCb: (tb: types.talkbackT) => void) => void
12) => {
13 let talkback = null;
14 const signals = [];
15
16 source(signal => {
17 signals.push(signal);
18 if (deriving.isStart(signal)) {
19 talkback = deriving.unboxStart(signal);
20 if (onStart) onStart(talkback);
21 talkback(deriving.pull);
22 } else if (deriving.isPush(signal)) {
23 talkback(deriving.pull);
24 }
25 })
26
27 return signals;
28};
29
30/* When a Close talkback signal is sent the source should immediately end */
31const passesActiveClose = (source: types.sourceT<any>) =>
32 it('stops emitting when a Close talkback signal is received (spec)', () => {
33 let talkback = null;
34
35 const sink: types.sinkT<any> = signal => {
36 expect(deriving.isPush(signal)).toBeFalsy();
37 expect(deriving.isEnd(signal)).toBeFalsy();
38 if (deriving.isStart(signal)) {
39 talkback = deriving.unboxStart(signal);
40 talkback(deriving.close);
41 }
42 };
43
44 source(sink);
45 expect(talkback).not.toBe(null);
46 });
47
48/* All synchronous, cold sources won't send anything unless a Pull signal
49 has been received. */
50const passesColdPull = (source: types.sourceT<any>) =>
51 it('sends nothing when no Pull talkback signal has been sent (spec)', () => {
52 let pushes = 0;
53 let talkback = null;
54
55 const sink: types.sinkT<any> = signal => {
56 if (deriving.isPush(signal)) {
57 pushes++;
58 } else if (deriving.isStart(signal)) {
59 talkback = deriving.unboxStart(signal);
60 }
61 };
62
63 source(sink);
64 expect(talkback).not.toBe(null);
65 expect(pushes).toBe(0);
66
67 setTimeout(() => {
68 expect(pushes).toBe(0);
69 talkback(deriving.pull);
70 }, 10);
71
72 jest.runAllTimers();
73 expect(pushes).toBe(1);
74 });
75
76/* All synchronous, cold sources need to use trampoline scheduling to avoid
77 recursively sending more and more Push signals which would eventually lead
78 to a call stack overflow when too many values are emitted. */
79const passesTrampoline = (source: types.sourceT<any>) =>
80 it('uses trampoline scheduling instead of recursive push signals (spec)', () => {
81 let talkback = null;
82 let pushes = 0;
83
84 const signals = [];
85 const sink: types.sinkT<any> = signal => {
86 if (deriving.isPush(signal)) {
87 const lastPushes = ++pushes;
88 signals.push(signal);
89 talkback(deriving.pull);
90 expect(lastPushes).toBe(pushes);
91 } else if (deriving.isStart(signal)) {
92 talkback = deriving.unboxStart(signal);
93 talkback(deriving.pull);
94 expect(pushes).toBe(2);
95 } else if (deriving.isEnd(signal)) {
96 signals.push(signal);
97 expect(pushes).toBe(2);
98 }
99 };
100
101 source(sink);
102
103 expect(signals).toEqual([
104 deriving.push(1),
105 deriving.push(2),
106 deriving.end(),
107 ]);
108 });
109
110beforeEach(() => {
111 jest.useFakeTimers();
112});
113
114describe('fromArray', () => {
115 passesTrampoline(sources.fromArray([1, 2]));
116 passesColdPull(sources.fromArray([0]));
117 passesActiveClose(sources.fromArray([0]));
118});
119
120describe('fromList', () => {
121 passesTrampoline(sources.fromList([1, [2]] as any));
122 passesColdPull(sources.fromList([0] as any));
123 passesActiveClose(sources.fromList([0] as any));
124});
125
126describe('fromValue', () => {
127 passesColdPull(sources.fromValue(0));
128 passesActiveClose(sources.fromValue(0));
129
130 it('sends a single value and ends', () => {
131 expect(collectSignals(sources.fromValue(1))).toEqual([
132 deriving.start(expect.any(Function)),
133 deriving.push(1),
134 deriving.end()
135 ]);
136 });
137});
138
139describe('make', () => {
140 it('may be used to create async sources', () => {
141 const teardown = jest.fn();
142 const source = sources.make(observer => {
143 setTimeout(() => observer.next(1), 10);
144 setTimeout(() => observer.complete(), 20);
145 return teardown;
146 });
147
148 const signals = collectSignals(source);
149 expect(signals).toEqual([deriving.start(expect.any(Function))]);
150 jest.runAllTimers();
151
152 expect(signals).toEqual([
153 deriving.start(expect.any(Function)),
154 deriving.push(1),
155 deriving.end(),
156 ]);
157 });
158
159 it('supports active cancellation', () => {
160 const teardown = jest.fn();
161 const source = sources.make(() => teardown);
162
163 const sink: types.sinkT<any> = signal => {
164 expect(deriving.isPush(signal)).toBeFalsy();
165 expect(deriving.isEnd(signal)).toBeFalsy();
166 if (deriving.isStart(signal))
167 setTimeout(() => deriving.unboxStart(signal)(deriving.close));
168 };
169
170 source(sink);
171 expect(teardown).not.toHaveBeenCalled();
172 jest.runAllTimers();
173 expect(teardown).toHaveBeenCalled();
174 });
175});
176
177describe('makeSubject', () => {
178 it('may be used to emit signals programmatically', () => {
179 const { source, next, complete } = sources.makeSubject();
180 const signals = collectSignals(source);
181
182 expect(signals).toEqual([
183 deriving.start(expect.any(Function)),
184 ]);
185
186 next(1);
187
188 expect(signals).toEqual([
189 deriving.start(expect.any(Function)),
190 deriving.push(1),
191 ]);
192
193 complete();
194
195 expect(signals).toEqual([
196 deriving.start(expect.any(Function)),
197 deriving.push(1),
198 deriving.end(),
199 ]);
200 });
201
202 it('ignores signals after complete has been called', () => {
203 const { source, next, complete } = sources.makeSubject();
204 const signals = collectSignals(source);
205 complete();
206
207 expect(signals).toEqual([
208 deriving.start(expect.any(Function)),
209 deriving.end(),
210 ]);
211
212 next(1);
213 complete();
214 expect(signals.length).toBe(2);
215 });
216});
217
218describe('never', () => {
219 it('emits nothing and ends immediately', () => {
220 const signals = collectSignals(sources.never);
221 expect(signals).toEqual([deriving.start(expect.any(Function)) ]);
222 });
223});
224
225describe('empty', () => {
226 it('emits nothing and ends immediately', () => {
227 const signals = collectSignals(sources.empty);
228
229 expect(signals).toEqual([
230 deriving.start(expect.any(Function)),
231 deriving.end(),
232 ]);
233 });
234});
235
236describe('fromPromise', () => {
237 passesActiveClose(web.fromPromise(Promise.resolve(null)));
238
239 it('emits a value when the promise resolves', async () => {
240 const promise = Promise.resolve(1);
241 const signals = collectSignals(web.fromPromise(promise));
242
243 expect(signals).toEqual([
244 deriving.start(expect.any(Function)),
245 ]);
246
247 await promise;
248
249 expect(signals).toEqual([
250 deriving.start(expect.any(Function)),
251 deriving.push(1),
252 deriving.end(),
253 ]);
254 });
255});
256
257describe('fromObservable', () => {
258 beforeEach(() => {
259 jest.useRealTimers();
260 });
261
262 it('converts an Observable to a Wonka source', async () => {
263 const source = web.fromObservable(Observable.from([1, 2]));
264 const signals = collectSignals(source);
265
266 await new Promise(resolve => setTimeout(resolve));
267
268 expect(signals).toEqual([
269 deriving.start(expect.any(Function)),
270 deriving.push(1),
271 deriving.push(2),
272 deriving.end(),
273 ]);
274 });
275
276 it('supports cancellation on converted Observables', async () => {
277 const source = web.fromObservable(Observable.from([1, 2]));
278 const signals = collectSignals(source, talkback => {
279 talkback(deriving.close);
280 });
281
282 await new Promise(resolve => setTimeout(resolve));
283
284 expect(signals).toEqual([
285 deriving.start(expect.any(Function)),
286 ]);
287 });
288});
289
290describe('fromCallbag', () => {
291 it('converts a Callbag to a Wonka source', () => {
292 const source = web.fromCallbag(callbagFromArray([1, 2]));
293 const signals = collectSignals(source);
294
295 expect(signals).toEqual([
296 deriving.start(expect.any(Function)),
297 deriving.push(1),
298 deriving.push(2),
299 deriving.end(),
300 ]);
301 });
302
303 it('supports cancellation on converted Observables', () => {
304 const source = web.fromCallbag(callbagFromArray([1, 2]));
305 const signals = collectSignals(source, talkback => {
306 talkback(deriving.close);
307 });
308
309 expect(signals).toEqual([
310 deriving.start(expect.any(Function)),
311 ]);
312 });
313});
314
315describe('interval', () => {
316 it('emits Push signals until Cancel is sent', () => {
317 let pushes = 0;
318 let talkback = null;
319
320 const sink: types.sinkT<any> = signal => {
321 if (deriving.isPush(signal)) {
322 pushes++;
323 } else if (deriving.isStart(signal)) {
324 talkback = deriving.unboxStart(signal);
325 }
326 };
327
328 web.interval(100)(sink);
329 expect(talkback).not.toBe(null);
330 expect(pushes).toBe(0);
331
332 jest.advanceTimersByTime(100);
333 expect(pushes).toBe(1);
334 jest.advanceTimersByTime(100);
335 expect(pushes).toBe(2);
336
337 talkback(deriving.close);
338 jest.advanceTimersByTime(100);
339 expect(pushes).toBe(2);
340 });
341});
342
343describe('fromDomEvent', () => {
344 it('emits Push signals for events on a DOM element', () => {
345 let talkback = null;
346
347 const element = {
348 addEventListener: jest.fn(),
349 removeEventListener: jest.fn(),
350 };
351
352 const sink: types.sinkT<any> = signal => {
353 expect(deriving.isEnd(signal)).toBeFalsy();
354 if (deriving.isStart(signal))
355 talkback = deriving.unboxStart(signal);
356 };
357
358 web.fromDomEvent(element as any, 'click')(sink);
359
360 expect(element.addEventListener).toHaveBeenCalledWith('click', expect.any(Function));
361 expect(element.removeEventListener).not.toHaveBeenCalled();
362 const listener = element.addEventListener.mock.calls[0][1];
363
364 listener(1);
365 listener(2);
366 talkback(deriving.close);
367 expect(element.removeEventListener).toHaveBeenCalledWith('click', listener);
368 });
369});