1import { describe, it, expect, vi } from 'vitest';
2
3import { Source, Sink, SignalKind, TalkbackKind } from '../types';
4import { push, start } from '../helpers';
5
6import * as sinks from '../sinks';
7import * as sources from '../sources';
8import * as callbag from '../callbag';
9import * as observable from '../observable';
10
11import Observable from 'zen-observable';
12import callbagIterate from 'callbag-iterate';
13import callbagTake from 'callbag-take';
14
15describe('subscribe', () => {
16 it('sends Pull talkback signals every Push signal', () => {
17 let pulls = 0;
18 const fn = vi.fn();
19
20 const source: Source<any> = sink => {
21 sink(
22 start(signal => {
23 if (signal === TalkbackKind.Pull) {
24 if (pulls < 3) {
25 pulls++;
26 sink(push(0));
27 } else {
28 sink(SignalKind.End);
29 expect(pulls).toBe(3);
30 }
31 }
32 })
33 );
34 };
35
36 sinks.subscribe(fn)(source);
37 expect(fn).toHaveBeenCalledTimes(3);
38 expect(pulls).toBe(3);
39 });
40
41 it('cancels when unsubscribe is called', () => {
42 let pulls = 0;
43 let closing = 0;
44
45 const source: Source<any> = sink => {
46 sink(
47 start(signal => {
48 if (signal === TalkbackKind.Pull) {
49 if (!pulls) {
50 pulls++;
51 sink(push(0));
52 }
53 } else {
54 closing++;
55 }
56 })
57 );
58 };
59
60 const sub = sinks.subscribe(() => {})(source);
61 expect(pulls).toBe(1);
62
63 sub.unsubscribe();
64 expect(closing).toBe(1);
65 });
66
67 it('ignores cancellation when the source has already ended', () => {
68 let pulls = 0;
69 let closing = 0;
70
71 const source: Source<any> = sink => {
72 sink(
73 start(signal => {
74 if (signal === TalkbackKind.Pull) {
75 pulls++;
76 sink(SignalKind.End);
77 } else {
78 closing++;
79 }
80 })
81 );
82 };
83
84 const sub = sinks.subscribe(() => {})(source);
85 expect(pulls).toBe(1);
86 sub.unsubscribe();
87 expect(closing).toBe(0);
88 });
89
90 it('ignores Push signals after the source has ended', () => {
91 const fn = vi.fn();
92 const source: Source<any> = sink => {
93 sink(
94 start(signal => {
95 if (signal === TalkbackKind.Pull) {
96 sink(SignalKind.End);
97 sink(push(0));
98 }
99 })
100 );
101 };
102
103 sinks.subscribe(fn)(source);
104 expect(fn).not.toHaveBeenCalled();
105 });
106
107 it('ignores Push signals after cancellation', () => {
108 const fn = vi.fn();
109 const source: Source<any> = sink => {
110 sink(
111 start(signal => {
112 if (signal === TalkbackKind.Close) {
113 sink(push(0));
114 }
115 })
116 );
117 };
118
119 sinks.subscribe(fn)(source).unsubscribe();
120 expect(fn).not.toHaveBeenCalled();
121 });
122});
123
124describe('publish', () => {
125 it('sends Pull talkback signals every Push signal', () => {
126 let pulls = 0;
127 const source: Source<any> = sink => {
128 sink(
129 start(signal => {
130 if (signal === TalkbackKind.Pull) {
131 if (pulls < 3) {
132 pulls++;
133 sink(push(0));
134 } else {
135 sink(SignalKind.End);
136 expect(pulls).toBe(3);
137 }
138 }
139 })
140 );
141 };
142
143 sinks.publish(source);
144 expect(pulls).toBe(3);
145 });
146});
147
148describe('toArray', () => {
149 it('sends Pull talkback signals every Push signal', () => {
150 let pulls = 0;
151 const source: Source<any> = sink => {
152 sink(
153 start(signal => {
154 if (signal === TalkbackKind.Pull) {
155 if (pulls < 3) {
156 pulls++;
157 sink(push(0));
158 } else {
159 sink(SignalKind.End);
160 expect(pulls).toBe(3);
161 }
162 }
163 })
164 );
165 };
166
167 const array = sinks.toArray(source);
168 expect(array).toEqual([0, 0, 0]);
169 expect(pulls).toBe(3);
170 });
171
172 it('sends a Close talkback signal after all synchronous values have been pulled', () => {
173 let pulls = 0;
174 let ending = 0;
175
176 const source: Source<any> = sink => {
177 sink(
178 start(signal => {
179 if (signal === TalkbackKind.Pull) {
180 if (!pulls) {
181 pulls++;
182 sink(push(0));
183 }
184 } else {
185 ending++;
186 }
187 })
188 );
189 };
190
191 const array = sinks.toArray(source);
192 expect(array).toEqual([0]);
193 expect(ending).toBe(1);
194 });
195});
196
197describe('toPromise', () => {
198 it('creates a Promise that resolves on the last value', async () => {
199 let pulls = 0;
200 let sink: Sink<any> | null = null;
201
202 const source: Source<any> = _sink => {
203 sink = _sink;
204 sink(
205 start(signal => {
206 if (signal === TalkbackKind.Pull) pulls++;
207 })
208 );
209 };
210
211 const fn = vi.fn();
212 const promise = sinks.toPromise(source).then(fn);
213
214 expect(pulls).toBe(1);
215 sink!(push(0));
216 expect(pulls).toBe(2);
217 sink!(push(1));
218 sink!(SignalKind.End);
219 expect(fn).not.toHaveBeenCalled();
220
221 await promise;
222 expect(fn).toHaveBeenCalledWith(1);
223 });
224
225 it('creates a Promise for synchronous sources', async () => {
226 const fn = vi.fn();
227 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn);
228 expect(fn).toHaveBeenCalledWith(3);
229 });
230});
231
232describe('toObservable', () => {
233 it('creates an Observable mirroring the Wonka source', () => {
234 const next = vi.fn();
235 const complete = vi.fn();
236 let pulls = 0;
237 let sink: Sink<any> | null = null;
238
239 const source: Source<any> = _sink => {
240 sink = _sink;
241 sink(
242 start(signal => {
243 if (signal === TalkbackKind.Pull) pulls++;
244 })
245 );
246 };
247
248 Observable.from(observable.toObservable(source) as any).subscribe({
249 next,
250 complete,
251 });
252
253 expect(pulls).toBe(1);
254 sink!(push(0));
255 expect(next).toHaveBeenCalledWith(0);
256 sink!(push(1));
257 expect(next).toHaveBeenCalledWith(1);
258 sink!(SignalKind.End);
259 expect(complete).toHaveBeenCalled();
260 });
261
262 it('forwards cancellations from the Observable as a talkback', () => {
263 let ending = 0;
264 const source: Source<any> = sink =>
265 sink(
266 start(signal => {
267 if (signal === TalkbackKind.Close) ending++;
268 })
269 );
270
271 const sub = Observable.from(observable.toObservable(source) as any).subscribe({});
272
273 expect(ending).toBe(0);
274 sub.unsubscribe();
275 expect(ending).toBe(1);
276 });
277});
278
279describe('toCallbag', () => {
280 it('creates a Callbag mirroring the Wonka source', () => {
281 const fn = vi.fn();
282 let pulls = 0;
283 let sink: Sink<any> | null = null;
284
285 const source: Source<any> = _sink => {
286 sink = _sink;
287 sink(
288 start(signal => {
289 if (signal === TalkbackKind.Pull) pulls++;
290 })
291 );
292 };
293
294 callbagIterate(fn)(callbag.toCallbag(source));
295
296 expect(pulls).toBe(1);
297 sink!(push(0));
298 expect(fn).toHaveBeenCalledWith(0);
299 sink!(push(1));
300 expect(fn).toHaveBeenCalledWith(1);
301 sink!(SignalKind.End);
302 });
303
304 it('forwards cancellations from the Callbag as a talkback', () => {
305 let ending = 0;
306 const fn = vi.fn();
307
308 const source: Source<any> = sink =>
309 sink(
310 start(signal => {
311 if (signal === TalkbackKind.Pull) {
312 sink(push(0));
313 } else {
314 ending++;
315 }
316 })
317 );
318
319 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any));
320
321 expect(fn.mock.calls).toEqual([[0]]);
322 expect(ending).toBe(1);
323 });
324});