Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { describe, it, expect, vi } from 'vitest'; 2 3import { Source, Sink, SignalKind, TalkbackKind } from '../types'; 4import { push, start } from '../helpers'; 5 6import * as sinks from '../sinks'; 7import * as sources from '../sources'; 8import * as callbag from '../callbag'; 9import * as observable from '../observable'; 10 11import Observable from 'zen-observable'; 12import callbagIterate from 'callbag-iterate'; 13import callbagTake from 'callbag-take'; 14 15describe('subscribe', () => { 16 it('sends Pull talkback signals every Push signal', () => { 17 let pulls = 0; 18 const fn = vi.fn(); 19 20 const source: Source<any> = sink => { 21 sink( 22 start(signal => { 23 if (signal === TalkbackKind.Pull) { 24 if (pulls < 3) { 25 pulls++; 26 sink(push(0)); 27 } else { 28 sink(SignalKind.End); 29 expect(pulls).toBe(3); 30 } 31 } 32 }) 33 ); 34 }; 35 36 sinks.subscribe(fn)(source); 37 expect(fn).toHaveBeenCalledTimes(3); 38 expect(pulls).toBe(3); 39 }); 40 41 it('cancels when unsubscribe is called', () => { 42 let pulls = 0; 43 let closing = 0; 44 45 const source: Source<any> = sink => { 46 sink( 47 start(signal => { 48 if (signal === TalkbackKind.Pull) { 49 if (!pulls) { 50 pulls++; 51 sink(push(0)); 52 } 53 } else { 54 closing++; 55 } 56 }) 57 ); 58 }; 59 60 const sub = sinks.subscribe(() => {})(source); 61 expect(pulls).toBe(1); 62 63 sub.unsubscribe(); 64 expect(closing).toBe(1); 65 }); 66 67 it('ignores cancellation when the source has already ended', () => { 68 let pulls = 0; 69 let closing = 0; 70 71 const source: Source<any> = sink => { 72 sink( 73 start(signal => { 74 if (signal === TalkbackKind.Pull) { 75 pulls++; 76 sink(SignalKind.End); 77 } else { 78 closing++; 79 } 80 }) 81 ); 82 }; 83 84 const sub = sinks.subscribe(() => {})(source); 85 expect(pulls).toBe(1); 86 sub.unsubscribe(); 87 expect(closing).toBe(0); 88 }); 89 90 it('ignores Push signals after the source has ended', () => { 91 const fn = vi.fn(); 92 const source: Source<any> = sink => { 93 sink( 94 start(signal => { 95 if (signal === TalkbackKind.Pull) { 96 sink(SignalKind.End); 97 sink(push(0)); 98 } 99 }) 100 ); 101 }; 102 103 sinks.subscribe(fn)(source); 104 expect(fn).not.toHaveBeenCalled(); 105 }); 106 107 it('ignores Push signals after cancellation', () => { 108 const fn = vi.fn(); 109 const source: Source<any> = sink => { 110 sink( 111 start(signal => { 112 if (signal === TalkbackKind.Close) { 113 sink(push(0)); 114 } 115 }) 116 ); 117 }; 118 119 sinks.subscribe(fn)(source).unsubscribe(); 120 expect(fn).not.toHaveBeenCalled(); 121 }); 122}); 123 124describe('publish', () => { 125 it('sends Pull talkback signals every Push signal', () => { 126 let pulls = 0; 127 const source: Source<any> = sink => { 128 sink( 129 start(signal => { 130 if (signal === TalkbackKind.Pull) { 131 if (pulls < 3) { 132 pulls++; 133 sink(push(0)); 134 } else { 135 sink(SignalKind.End); 136 expect(pulls).toBe(3); 137 } 138 } 139 }) 140 ); 141 }; 142 143 sinks.publish(source); 144 expect(pulls).toBe(3); 145 }); 146}); 147 148describe('toArray', () => { 149 it('sends Pull talkback signals every Push signal', () => { 150 let pulls = 0; 151 const source: Source<any> = sink => { 152 sink( 153 start(signal => { 154 if (signal === TalkbackKind.Pull) { 155 if (pulls < 3) { 156 pulls++; 157 sink(push(0)); 158 } else { 159 sink(SignalKind.End); 160 expect(pulls).toBe(3); 161 } 162 } 163 }) 164 ); 165 }; 166 167 const array = sinks.toArray(source); 168 expect(array).toEqual([0, 0, 0]); 169 expect(pulls).toBe(3); 170 }); 171 172 it('sends a Close talkback signal after all synchronous values have been pulled', () => { 173 let pulls = 0; 174 let ending = 0; 175 176 const source: Source<any> = sink => { 177 sink( 178 start(signal => { 179 if (signal === TalkbackKind.Pull) { 180 if (!pulls) { 181 pulls++; 182 sink(push(0)); 183 } 184 } else { 185 ending++; 186 } 187 }) 188 ); 189 }; 190 191 const array = sinks.toArray(source); 192 expect(array).toEqual([0]); 193 expect(ending).toBe(1); 194 }); 195}); 196 197describe('toPromise', () => { 198 it('creates a Promise that resolves on the last value', async () => { 199 let pulls = 0; 200 let sink: Sink<any> | null = null; 201 202 const source: Source<any> = _sink => { 203 sink = _sink; 204 sink( 205 start(signal => { 206 if (signal === TalkbackKind.Pull) pulls++; 207 }) 208 ); 209 }; 210 211 const fn = vi.fn(); 212 const promise = sinks.toPromise(source).then(fn); 213 214 expect(pulls).toBe(1); 215 sink!(push(0)); 216 expect(pulls).toBe(2); 217 sink!(push(1)); 218 sink!(SignalKind.End); 219 expect(fn).not.toHaveBeenCalled(); 220 221 await promise; 222 expect(fn).toHaveBeenCalledWith(1); 223 }); 224 225 it('creates a Promise for synchronous sources', async () => { 226 const fn = vi.fn(); 227 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn); 228 expect(fn).toHaveBeenCalledWith(3); 229 }); 230}); 231 232describe('toObservable', () => { 233 it('creates an Observable mirroring the Wonka source', () => { 234 const next = vi.fn(); 235 const complete = vi.fn(); 236 let pulls = 0; 237 let sink: Sink<any> | null = null; 238 239 const source: Source<any> = _sink => { 240 sink = _sink; 241 sink( 242 start(signal => { 243 if (signal === TalkbackKind.Pull) pulls++; 244 }) 245 ); 246 }; 247 248 Observable.from(observable.toObservable(source) as any).subscribe({ 249 next, 250 complete, 251 }); 252 253 expect(pulls).toBe(1); 254 sink!(push(0)); 255 expect(next).toHaveBeenCalledWith(0); 256 sink!(push(1)); 257 expect(next).toHaveBeenCalledWith(1); 258 sink!(SignalKind.End); 259 expect(complete).toHaveBeenCalled(); 260 }); 261 262 it('forwards cancellations from the Observable as a talkback', () => { 263 let ending = 0; 264 const source: Source<any> = sink => 265 sink( 266 start(signal => { 267 if (signal === TalkbackKind.Close) ending++; 268 }) 269 ); 270 271 const sub = Observable.from(observable.toObservable(source) as any).subscribe({}); 272 273 expect(ending).toBe(0); 274 sub.unsubscribe(); 275 expect(ending).toBe(1); 276 }); 277}); 278 279describe('toCallbag', () => { 280 it('creates a Callbag mirroring the Wonka source', () => { 281 const fn = vi.fn(); 282 let pulls = 0; 283 let sink: Sink<any> | null = null; 284 285 const source: Source<any> = _sink => { 286 sink = _sink; 287 sink( 288 start(signal => { 289 if (signal === TalkbackKind.Pull) pulls++; 290 }) 291 ); 292 }; 293 294 callbagIterate(fn)(callbag.toCallbag(source)); 295 296 expect(pulls).toBe(1); 297 sink!(push(0)); 298 expect(fn).toHaveBeenCalledWith(0); 299 sink!(push(1)); 300 expect(fn).toHaveBeenCalledWith(1); 301 sink!(SignalKind.End); 302 }); 303 304 it('forwards cancellations from the Callbag as a talkback', () => { 305 let ending = 0; 306 const fn = vi.fn(); 307 308 const source: Source<any> = sink => 309 sink( 310 start(signal => { 311 if (signal === TalkbackKind.Pull) { 312 sink(push(0)); 313 } else { 314 ending++; 315 } 316 }) 317 ); 318 319 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any)); 320 321 expect(fn.mock.calls).toEqual([[0]]); 322 expect(ending).toBe(1); 323 }); 324});