1import * as deriving from './helpers/wonka_deriving';
2import * as sinks from './wonka_sinks.gen';
3import * as sources from './wonka_sources.gen';
4import * as web from './web/wonkaJs.gen';
5import * as types from './wonka_types.gen';
6
7import Observable from 'zen-observable';
8import callbagIterate from 'callbag-iterate';
9import callbagTake from 'callbag-take';
10
11describe('subscribe', () => {
12 it('sends Pull talkback signals every Push signal', () => {
13 let pulls = 0;
14 const fn = jest.fn();
15
16 const source: types.sourceT<any> = sink => {
17 sink(deriving.start(tb => {
18 if (tb === deriving.pull) {
19 if (pulls < 3) {
20 pulls++;
21 sink(deriving.push(0));
22 } else {
23 sink(deriving.end());
24 expect(pulls).toBe(3);
25 }
26 }
27 }));
28 };
29
30 sinks.subscribe(fn)(source);
31 expect(fn).toHaveBeenCalledTimes(3);
32 expect(pulls).toBe(3);
33 });
34
35 it('cancels when unsubscribe is called', () => {
36 let pulls = 0;
37 let closing = 0;
38
39 const source: types.sourceT<any> = sink => {
40 sink(deriving.start(tb => {
41 if (tb === deriving.pull) {
42 if (!pulls) {
43 pulls++;
44 sink(deriving.push(0));
45 }
46 } else if (tb === deriving.close) {
47 closing++;
48 }
49 }));
50 };
51
52 const sub = sinks.subscribe(() => {})(source);
53 expect(pulls).toBe(1);
54
55 sub.unsubscribe();
56 expect(closing).toBe(1);
57 });
58
59 it('ignores cancellation when the source has already ended', () => {
60 let pulls = 0;
61 let closing = 0;
62
63 const source: types.sourceT<any> = sink => {
64 sink(deriving.start(tb => {
65 if (tb === deriving.pull) {
66 pulls++;
67 sink(deriving.end());
68 } else if (tb === deriving.close) {
69 closing++;
70 }
71 }));
72 };
73
74 const sub = sinks.subscribe(() => {})(source);
75 expect(pulls).toBe(1);
76 sub.unsubscribe();
77 expect(closing).toBe(0);
78 });
79
80 it('ignores Push signals after the source has ended', () => {
81 const fn = jest.fn();
82 const source: types.sourceT<any> = sink => {
83 sink(deriving.start(tb => {
84 if (tb === deriving.pull) {
85 sink(deriving.end());
86 sink(deriving.push(0));
87 }
88 }));
89 };
90
91 sinks.subscribe(fn)(source);
92 expect(fn).not.toHaveBeenCalled();
93 });
94
95 it('ignores Push signals after cancellation', () => {
96 const fn = jest.fn();
97 const source: types.sourceT<any> = sink => {
98 sink(deriving.start(tb => {
99 if (tb === deriving.close) {
100 sink(deriving.push(0));
101 }
102 }));
103 };
104
105 sinks.subscribe(fn)(source).unsubscribe();
106 expect(fn).not.toHaveBeenCalled();
107 });
108});
109
110describe('publish', () => {
111 it('sends Pull talkback signals every Push signal', () => {
112 let pulls = 0;
113 const source: types.sourceT<any> = sink => {
114 sink(deriving.start(tb => {
115 if (tb === deriving.pull) {
116 if (pulls < 3) {
117 pulls++;
118 sink(deriving.push(0));
119 } else {
120 sink(deriving.end());
121 expect(pulls).toBe(3);
122 }
123 }
124 }));
125 };
126
127 sinks.publish(source);
128 expect(pulls).toBe(3);
129 });
130});
131
132describe('toArray', () => {
133 it('sends Pull talkback signals every Push signal', () => {
134 let pulls = 0;
135 const source: types.sourceT<any> = sink => {
136 sink(deriving.start(tb => {
137 if (tb === deriving.pull) {
138 if (pulls < 3) {
139 pulls++;
140 sink(deriving.push(0));
141 } else {
142 sink(deriving.end());
143 expect(pulls).toBe(3);
144 }
145 }
146 }));
147 };
148
149 const array = sinks.toArray(source);
150 expect(array).toEqual([0, 0, 0]);
151 expect(pulls).toBe(3);
152 });
153
154 it('sends a Close talkback signal after all synchronous values have been pulled', () => {
155 let pulls = 0;
156 let ending = 0;
157
158 const source: types.sourceT<any> = sink => {
159 sink(deriving.start(tb => {
160 if (tb === deriving.pull) {
161 if (!pulls) {
162 pulls++;
163 sink(deriving.push(0));
164 }
165 } else if (tb === deriving.close) {
166 ending++;
167 }
168 }));
169 };
170
171 const array = sinks.toArray(source);
172 expect(array).toEqual([0]);
173 expect(ending).toBe(1);
174 });
175});
176
177describe('toPromise', () => {
178 it('creates a Promise that resolves on the last value', async () => {
179 let pulls = 0;
180 let sink = null;
181
182 const source: types.sourceT<any> = _sink => {
183 sink = _sink;
184 sink(deriving.start(tb => {
185 if (tb === deriving.pull)
186 pulls++;
187 }));
188 };
189
190 const fn = jest.fn();
191 const promise = web.toPromise(source).then(fn);
192
193 expect(pulls).toBe(1);
194 sink(deriving.push(0));
195 expect(pulls).toBe(2);
196 sink(deriving.push(1));
197 sink(deriving.end());
198 expect(fn).not.toHaveBeenCalled();
199
200 await promise;
201 expect(fn).toHaveBeenCalledWith(1);
202 });
203
204 it('creates a Promise for synchronous sources', async () => {
205 const fn = jest.fn();
206 await web.toPromise(sources.fromArray([1, 2, 3])).then(fn);
207 expect(fn).toHaveBeenCalledWith(3);
208 });
209});
210
211describe('toObservable', () => {
212 it('creates an Observable mirroring the Wonka source', () => {
213 const next = jest.fn();
214 const complete = jest.fn();
215 let pulls = 0;
216 let sink = null;
217
218 const source: types.sourceT<any> = _sink => {
219 sink = _sink;
220 sink(deriving.start(tb => {
221 if (tb === deriving.pull)
222 pulls++;
223 }));
224 };
225
226 Observable.from(web.toObservable(source) as any).subscribe({
227 next,
228 complete,
229 });
230
231 expect(pulls).toBe(1);
232 sink(deriving.push(0));
233 expect(next).toHaveBeenCalledWith(0);
234 sink(deriving.push(1));
235 expect(next).toHaveBeenCalledWith(1);
236 sink(deriving.end());
237 expect(complete).toHaveBeenCalled();
238 });
239
240 it('forwards cancellations from the Observable as a talkback', () => {
241 let ending = 0;
242 const source: types.sourceT<any> = sink =>
243 sink(deriving.start(tb => {
244 if (tb === deriving.close)
245 ending++;
246 }));
247
248 const sub = Observable.from(web.toObservable(source) as any).subscribe({});
249
250 expect(ending).toBe(0);
251 sub.unsubscribe();
252 expect(ending).toBe(1);
253 });
254});
255
256describe('toCallbag', () => {
257 it('creates a Callbag mirroring the Wonka source', () => {
258 const fn = jest.fn();
259 let pulls = 0;
260 let sink = null;
261
262 const source: types.sourceT<any> = _sink => {
263 sink = _sink;
264 sink(deriving.start(tb => {
265 if (tb === deriving.pull)
266 pulls++;
267 }));
268 };
269
270 callbagIterate(fn)(web.toCallbag(source));
271
272 expect(pulls).toBe(1);
273 sink(deriving.push(0));
274 expect(fn).toHaveBeenCalledWith(0);
275 sink(deriving.push(1));
276 expect(fn).toHaveBeenCalledWith(1);
277 sink(deriving.end());
278 });
279
280 it('forwards cancellations from the Callbag as a talkback', () => {
281 let ending = 0;
282 const fn = jest.fn();
283
284 const source: types.sourceT<any> = sink =>
285 sink(deriving.start(tb => {
286 if (tb === deriving.pull)
287 sink(deriving.push(0));
288 if (tb === deriving.close)
289 ending++;
290 }));
291
292 callbagIterate(fn)(callbagTake(1)(web.toCallbag(source) as any));
293
294 expect(fn.mock.calls).toEqual([[0]]);
295 expect(ending).toBe(1);
296 });
297});