1import * as deriving from './helpers/wonka_deriving';
2import * as sinks from './wonka_sinks.gen';
3import * as web from './web/wonkaJs.gen';
4import * as types from './wonka_types.gen';
5
6import Observable from 'zen-observable';
7import callbagIterate from 'callbag-iterate';
8import callbagTake from 'callbag-take';
9
10describe('subscribe', () => {
11 it('sends Pull talkback signals every Push signal', () => {
12 let pulls = 0;
13 const fn = jest.fn();
14
15 const source: types.sourceT<any> = sink => {
16 sink(deriving.start(tb => {
17 if (tb === deriving.pull) {
18 if (pulls < 3) {
19 pulls++;
20 sink(deriving.push(0));
21 } else {
22 sink(deriving.end());
23 expect(pulls).toBe(3);
24 }
25 }
26 }));
27 };
28
29 sinks.subscribe(fn)(source);
30 expect(fn).toHaveBeenCalledTimes(3);
31 expect(pulls).toBe(3);
32 });
33
34 it('cancels when unsubscribe is called', () => {
35 let pulls = 0;
36 let closing = 0;
37
38 const source: types.sourceT<any> = sink => {
39 sink(deriving.start(tb => {
40 if (tb === deriving.pull) {
41 if (!pulls) {
42 pulls++;
43 sink(deriving.push(0));
44 }
45 } else if (tb === deriving.close) {
46 closing++;
47 }
48 }));
49 };
50
51 const sub = sinks.subscribe(() => {})(source);
52 expect(pulls).toBe(1);
53
54 sub.unsubscribe();
55 expect(closing).toBe(1);
56 });
57
58 it('ignores cancellation when the source has already ended', () => {
59 let pulls = 0;
60 let closing = 0;
61
62 const source: types.sourceT<any> = sink => {
63 sink(deriving.start(tb => {
64 if (tb === deriving.pull) {
65 pulls++;
66 sink(deriving.end());
67 } else if (tb === deriving.close) {
68 closing++;
69 }
70 }));
71 };
72
73 const sub = sinks.subscribe(() => {})(source);
74 expect(pulls).toBe(1);
75 sub.unsubscribe();
76 expect(closing).toBe(0);
77 });
78
79 it('ignores Push signals after the source has ended', () => {
80 const fn = jest.fn();
81 const source: types.sourceT<any> = sink => {
82 sink(deriving.start(tb => {
83 if (tb === deriving.pull) {
84 sink(deriving.end());
85 sink(deriving.push(0));
86 }
87 }));
88 };
89
90 sinks.subscribe(fn)(source);
91 expect(fn).not.toHaveBeenCalled();
92 });
93
94 it('ignores Push signals after cancellation', () => {
95 const fn = jest.fn();
96 const source: types.sourceT<any> = sink => {
97 sink(deriving.start(tb => {
98 if (tb === deriving.close) {
99 sink(deriving.push(0));
100 }
101 }));
102 };
103
104 sinks.subscribe(fn)(source).unsubscribe();
105 expect(fn).not.toHaveBeenCalled();
106 });
107});
108
109describe('publish', () => {
110 it('sends Pull talkback signals every Push signal', () => {
111 let pulls = 0;
112 const source: types.sourceT<any> = sink => {
113 sink(deriving.start(tb => {
114 if (tb === deriving.pull) {
115 if (pulls < 3) {
116 pulls++;
117 sink(deriving.push(0));
118 } else {
119 sink(deriving.end());
120 expect(pulls).toBe(3);
121 }
122 }
123 }));
124 };
125
126 sinks.publish(source);
127 expect(pulls).toBe(3);
128 });
129});
130
131describe('toArray', () => {
132 it('sends Pull talkback signals every Push signal', () => {
133 let pulls = 0;
134 const source: types.sourceT<any> = sink => {
135 sink(deriving.start(tb => {
136 if (tb === deriving.pull) {
137 if (pulls < 3) {
138 pulls++;
139 sink(deriving.push(0));
140 } else {
141 sink(deriving.end());
142 expect(pulls).toBe(3);
143 }
144 }
145 }));
146 };
147
148 const array = sinks.toArray(source);
149 expect(array).toEqual([0, 0, 0]);
150 expect(pulls).toBe(3);
151 });
152
153 it('sends a Close talkback signal after all synchronous values have been pulled', () => {
154 let pulls = 0;
155 let ending = 0;
156
157 const source: types.sourceT<any> = sink => {
158 sink(deriving.start(tb => {
159 if (tb === deriving.pull) {
160 if (!pulls) {
161 pulls++;
162 sink(deriving.push(0));
163 }
164 } else if (tb === deriving.close) {
165 ending++;
166 }
167 }));
168 };
169
170 const array = sinks.toArray(source);
171 expect(array).toEqual([0]);
172 expect(ending).toBe(1);
173 });
174});
175
176describe('toPromise', () => {
177 it('creates a Promise that resolves on the last value', async () => {
178 let pulls = 0;
179 let sink = null;
180
181 const source: types.sourceT<any> = _sink => {
182 sink = _sink;
183 sink(deriving.start(tb => {
184 if (tb === deriving.pull)
185 pulls++;
186 }));
187 };
188
189 const fn = jest.fn();
190 const promise = web.toPromise(source).then(fn);
191
192 expect(pulls).toBe(1);
193 sink(deriving.push(0));
194 expect(pulls).toBe(2);
195 sink(deriving.push(1));
196 sink(deriving.end());
197 expect(fn).not.toHaveBeenCalled();
198
199 await promise;
200 expect(fn).toHaveBeenCalledWith(1);
201 });
202});
203
204describe('toObservable', () => {
205 it('creates an Observable mirroring the Wonka source', () => {
206 const next = jest.fn();
207 const complete = jest.fn();
208 let pulls = 0;
209 let sink = null;
210
211 const source: types.sourceT<any> = _sink => {
212 sink = _sink;
213 sink(deriving.start(tb => {
214 if (tb === deriving.pull)
215 pulls++;
216 }));
217 };
218
219 Observable.from(web.toObservable(source) as any).subscribe({
220 next,
221 complete,
222 });
223
224 expect(pulls).toBe(1);
225 sink(deriving.push(0));
226 expect(next).toHaveBeenCalledWith(0);
227 sink(deriving.push(1));
228 expect(next).toHaveBeenCalledWith(1);
229 sink(deriving.end());
230 expect(complete).toHaveBeenCalled();
231 });
232
233 it('forwards cancellations from the Observable as a talkback', () => {
234 let ending = 0;
235 const source: types.sourceT<any> = sink =>
236 sink(deriving.start(tb => {
237 if (tb === deriving.close)
238 ending++;
239 }));
240
241 const sub = Observable.from(web.toObservable(source) as any).subscribe({});
242
243 expect(ending).toBe(0);
244 sub.unsubscribe();
245 expect(ending).toBe(1);
246 });
247});
248
249describe('toCallbag', () => {
250 it('creates a Callbag mirroring the Wonka source', () => {
251 const fn = jest.fn();
252 let pulls = 0;
253 let sink = null;
254
255 const source: types.sourceT<any> = _sink => {
256 sink = _sink;
257 sink(deriving.start(tb => {
258 if (tb === deriving.pull)
259 pulls++;
260 }));
261 };
262
263 callbagIterate(fn)(web.toCallbag(source));
264
265 expect(pulls).toBe(1);
266 sink(deriving.push(0));
267 expect(fn).toHaveBeenCalledWith(0);
268 sink(deriving.push(1));
269 expect(fn).toHaveBeenCalledWith(1);
270 sink(deriving.end());
271 });
272
273 it('forwards cancellations from the Callbag as a talkback', () => {
274 let ending = 0;
275 const fn = jest.fn();
276
277 const source: types.sourceT<any> = sink =>
278 sink(deriving.start(tb => {
279 if (tb === deriving.pull)
280 sink(deriving.push(0));
281 if (tb === deriving.close)
282 ending++;
283 }));
284
285 callbagIterate(fn)(callbagTake(1)(web.toCallbag(source) as any));
286
287 expect(fn.mock.calls).toEqual([[0]]);
288 expect(ending).toBe(1);
289 });
290});