1import { Source, Sink, SignalKind, TalkbackKind } from '../types';
2import { push, start } from '../helpers';
3
4import * as sinks from '../sinks';
5import * as sources from '../sources';
6import * as callbag from '../callbag';
7import * as observable from '../observable';
8
9import Observable from 'zen-observable';
10import callbagIterate from 'callbag-iterate';
11import callbagTake from 'callbag-take';
12
13describe('subscribe', () => {
14 it('sends Pull talkback signals every Push signal', () => {
15 let pulls = 0;
16 const fn = jest.fn();
17
18 const source: Source<any> = sink => {
19 sink(
20 start(signal => {
21 if (signal === TalkbackKind.Pull) {
22 if (pulls < 3) {
23 pulls++;
24 sink(push(0));
25 } else {
26 sink(SignalKind.End);
27 expect(pulls).toBe(3);
28 }
29 }
30 })
31 );
32 };
33
34 sinks.subscribe(fn)(source);
35 expect(fn).toHaveBeenCalledTimes(3);
36 expect(pulls).toBe(3);
37 });
38
39 it('cancels when unsubscribe is called', () => {
40 let pulls = 0;
41 let closing = 0;
42
43 const source: Source<any> = sink => {
44 sink(
45 start(signal => {
46 if (signal === TalkbackKind.Pull) {
47 if (!pulls) {
48 pulls++;
49 sink(push(0));
50 }
51 } else {
52 closing++;
53 }
54 })
55 );
56 };
57
58 const sub = sinks.subscribe(() => {})(source);
59 expect(pulls).toBe(1);
60
61 sub.unsubscribe();
62 expect(closing).toBe(1);
63 });
64
65 it('ignores cancellation when the source has already ended', () => {
66 let pulls = 0;
67 let closing = 0;
68
69 const source: Source<any> = sink => {
70 sink(
71 start(signal => {
72 if (signal === TalkbackKind.Pull) {
73 pulls++;
74 sink(SignalKind.End);
75 } else {
76 closing++;
77 }
78 })
79 );
80 };
81
82 const sub = sinks.subscribe(() => {})(source);
83 expect(pulls).toBe(1);
84 sub.unsubscribe();
85 expect(closing).toBe(0);
86 });
87
88 it('ignores Push signals after the source has ended', () => {
89 const fn = jest.fn();
90 const source: Source<any> = sink => {
91 sink(
92 start(signal => {
93 if (signal === TalkbackKind.Pull) {
94 sink(SignalKind.End);
95 sink(push(0));
96 }
97 })
98 );
99 };
100
101 sinks.subscribe(fn)(source);
102 expect(fn).not.toHaveBeenCalled();
103 });
104
105 it('ignores Push signals after cancellation', () => {
106 const fn = jest.fn();
107 const source: Source<any> = sink => {
108 sink(
109 start(signal => {
110 if (signal === TalkbackKind.Close) {
111 sink(push(0));
112 }
113 })
114 );
115 };
116
117 sinks.subscribe(fn)(source).unsubscribe();
118 expect(fn).not.toHaveBeenCalled();
119 });
120});
121
122describe('publish', () => {
123 it('sends Pull talkback signals every Push signal', () => {
124 let pulls = 0;
125 const source: Source<any> = sink => {
126 sink(
127 start(signal => {
128 if (signal === TalkbackKind.Pull) {
129 if (pulls < 3) {
130 pulls++;
131 sink(push(0));
132 } else {
133 sink(SignalKind.End);
134 expect(pulls).toBe(3);
135 }
136 }
137 })
138 );
139 };
140
141 sinks.publish(source);
142 expect(pulls).toBe(3);
143 });
144});
145
146describe('toArray', () => {
147 it('sends Pull talkback signals every Push signal', () => {
148 let pulls = 0;
149 const source: Source<any> = sink => {
150 sink(
151 start(signal => {
152 if (signal === TalkbackKind.Pull) {
153 if (pulls < 3) {
154 pulls++;
155 sink(push(0));
156 } else {
157 sink(SignalKind.End);
158 expect(pulls).toBe(3);
159 }
160 }
161 })
162 );
163 };
164
165 const array = sinks.toArray(source);
166 expect(array).toEqual([0, 0, 0]);
167 expect(pulls).toBe(3);
168 });
169
170 it('sends a Close talkback signal after all synchronous values have been pulled', () => {
171 let pulls = 0;
172 let ending = 0;
173
174 const source: Source<any> = sink => {
175 sink(
176 start(signal => {
177 if (signal === TalkbackKind.Pull) {
178 if (!pulls) {
179 pulls++;
180 sink(push(0));
181 }
182 } else {
183 ending++;
184 }
185 })
186 );
187 };
188
189 const array = sinks.toArray(source);
190 expect(array).toEqual([0]);
191 expect(ending).toBe(1);
192 });
193});
194
195describe('toPromise', () => {
196 it('creates a Promise that resolves on the last value', async () => {
197 let pulls = 0;
198 let sink: Sink<any> | null = null;
199
200 const source: Source<any> = _sink => {
201 sink = _sink;
202 sink(
203 start(signal => {
204 if (signal === TalkbackKind.Pull) pulls++;
205 })
206 );
207 };
208
209 const fn = jest.fn();
210 const promise = sinks.toPromise(source).then(fn);
211
212 expect(pulls).toBe(1);
213 sink!(push(0));
214 expect(pulls).toBe(2);
215 sink!(push(1));
216 sink!(SignalKind.End);
217 expect(fn).not.toHaveBeenCalled();
218
219 await promise;
220 expect(fn).toHaveBeenCalledWith(1);
221 });
222
223 it('creates a Promise for synchronous sources', async () => {
224 const fn = jest.fn();
225 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn);
226 expect(fn).toHaveBeenCalledWith(3);
227 });
228});
229
230describe('toObservable', () => {
231 it('creates an Observable mirroring the Wonka source', () => {
232 const next = jest.fn();
233 const complete = jest.fn();
234 let pulls = 0;
235 let sink: Sink<any> | null = null;
236
237 const source: Source<any> = _sink => {
238 sink = _sink;
239 sink(
240 start(signal => {
241 if (signal === TalkbackKind.Pull) pulls++;
242 })
243 );
244 };
245
246 Observable.from(observable.toObservable(source) as any).subscribe({
247 next,
248 complete,
249 });
250
251 expect(pulls).toBe(1);
252 sink!(push(0));
253 expect(next).toHaveBeenCalledWith(0);
254 sink!(push(1));
255 expect(next).toHaveBeenCalledWith(1);
256 sink!(SignalKind.End);
257 expect(complete).toHaveBeenCalled();
258 });
259
260 it('forwards cancellations from the Observable as a talkback', () => {
261 let ending = 0;
262 const source: Source<any> = sink =>
263 sink(
264 start(signal => {
265 if (signal === TalkbackKind.Close) ending++;
266 })
267 );
268
269 const sub = Observable.from(observable.toObservable(source) as any).subscribe({});
270
271 expect(ending).toBe(0);
272 sub.unsubscribe();
273 expect(ending).toBe(1);
274 });
275});
276
277describe('toCallbag', () => {
278 it('creates a Callbag mirroring the Wonka source', () => {
279 const fn = jest.fn();
280 let pulls = 0;
281 let sink: Sink<any> | null = null;
282
283 const source: Source<any> = _sink => {
284 sink = _sink;
285 sink(
286 start(signal => {
287 if (signal === TalkbackKind.Pull) pulls++;
288 })
289 );
290 };
291
292 callbagIterate(fn)(callbag.toCallbag(source));
293
294 expect(pulls).toBe(1);
295 sink!(push(0));
296 expect(fn).toHaveBeenCalledWith(0);
297 sink!(push(1));
298 expect(fn).toHaveBeenCalledWith(1);
299 sink!(SignalKind.End);
300 });
301
302 it('forwards cancellations from the Callbag as a talkback', () => {
303 let ending = 0;
304 const fn = jest.fn();
305
306 const source: Source<any> = sink =>
307 sink(
308 start(signal => {
309 if (signal === TalkbackKind.Pull) {
310 sink(push(0));
311 } else {
312 ending++;
313 }
314 })
315 );
316
317 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any));
318
319 expect(fn.mock.calls).toEqual([[0]]);
320 expect(ending).toBe(1);
321 });
322});