Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Sink, SignalKind, TalkbackKind } from '../types'; 2import { push, start } from '../helpers'; 3 4import * as sinks from '../sinks'; 5import * as sources from '../sources'; 6import * as callbag from '../callbag'; 7import * as observable from '../observable'; 8 9import Observable from 'zen-observable'; 10import callbagIterate from 'callbag-iterate'; 11import callbagTake from 'callbag-take'; 12 13describe('subscribe', () => { 14 it('sends Pull talkback signals every Push signal', () => { 15 let pulls = 0; 16 const fn = jest.fn(); 17 18 const source: Source<any> = sink => { 19 sink( 20 start(signal => { 21 if (signal === TalkbackKind.Pull) { 22 if (pulls < 3) { 23 pulls++; 24 sink(push(0)); 25 } else { 26 sink(SignalKind.End); 27 expect(pulls).toBe(3); 28 } 29 } 30 }) 31 ); 32 }; 33 34 sinks.subscribe(fn)(source); 35 expect(fn).toHaveBeenCalledTimes(3); 36 expect(pulls).toBe(3); 37 }); 38 39 it('cancels when unsubscribe is called', () => { 40 let pulls = 0; 41 let closing = 0; 42 43 const source: Source<any> = sink => { 44 sink( 45 start(signal => { 46 if (signal === TalkbackKind.Pull) { 47 if (!pulls) { 48 pulls++; 49 sink(push(0)); 50 } 51 } else { 52 closing++; 53 } 54 }) 55 ); 56 }; 57 58 const sub = sinks.subscribe(() => {})(source); 59 expect(pulls).toBe(1); 60 61 sub.unsubscribe(); 62 expect(closing).toBe(1); 63 }); 64 65 it('ignores cancellation when the source has already ended', () => { 66 let pulls = 0; 67 let closing = 0; 68 69 const source: Source<any> = sink => { 70 sink( 71 start(signal => { 72 if (signal === TalkbackKind.Pull) { 73 pulls++; 74 sink(SignalKind.End); 75 } else { 76 closing++; 77 } 78 }) 79 ); 80 }; 81 82 const sub = sinks.subscribe(() => {})(source); 83 expect(pulls).toBe(1); 84 sub.unsubscribe(); 85 expect(closing).toBe(0); 86 }); 87 88 it('ignores Push signals after the source has ended', () => { 89 const fn = jest.fn(); 90 const source: Source<any> = sink => { 91 sink( 92 start(signal => { 93 if (signal === TalkbackKind.Pull) { 94 sink(SignalKind.End); 95 sink(push(0)); 96 } 97 }) 98 ); 99 }; 100 101 sinks.subscribe(fn)(source); 102 expect(fn).not.toHaveBeenCalled(); 103 }); 104 105 it('ignores Push signals after cancellation', () => { 106 const fn = jest.fn(); 107 const source: Source<any> = sink => { 108 sink( 109 start(signal => { 110 if (signal === TalkbackKind.Close) { 111 sink(push(0)); 112 } 113 }) 114 ); 115 }; 116 117 sinks.subscribe(fn)(source).unsubscribe(); 118 expect(fn).not.toHaveBeenCalled(); 119 }); 120}); 121 122describe('publish', () => { 123 it('sends Pull talkback signals every Push signal', () => { 124 let pulls = 0; 125 const source: Source<any> = sink => { 126 sink( 127 start(signal => { 128 if (signal === TalkbackKind.Pull) { 129 if (pulls < 3) { 130 pulls++; 131 sink(push(0)); 132 } else { 133 sink(SignalKind.End); 134 expect(pulls).toBe(3); 135 } 136 } 137 }) 138 ); 139 }; 140 141 sinks.publish(source); 142 expect(pulls).toBe(3); 143 }); 144}); 145 146describe('toArray', () => { 147 it('sends Pull talkback signals every Push signal', () => { 148 let pulls = 0; 149 const source: Source<any> = sink => { 150 sink( 151 start(signal => { 152 if (signal === TalkbackKind.Pull) { 153 if (pulls < 3) { 154 pulls++; 155 sink(push(0)); 156 } else { 157 sink(SignalKind.End); 158 expect(pulls).toBe(3); 159 } 160 } 161 }) 162 ); 163 }; 164 165 const array = sinks.toArray(source); 166 expect(array).toEqual([0, 0, 0]); 167 expect(pulls).toBe(3); 168 }); 169 170 it('sends a Close talkback signal after all synchronous values have been pulled', () => { 171 let pulls = 0; 172 let ending = 0; 173 174 const source: Source<any> = sink => { 175 sink( 176 start(signal => { 177 if (signal === TalkbackKind.Pull) { 178 if (!pulls) { 179 pulls++; 180 sink(push(0)); 181 } 182 } else { 183 ending++; 184 } 185 }) 186 ); 187 }; 188 189 const array = sinks.toArray(source); 190 expect(array).toEqual([0]); 191 expect(ending).toBe(1); 192 }); 193}); 194 195describe('toPromise', () => { 196 it('creates a Promise that resolves on the last value', async () => { 197 let pulls = 0; 198 let sink: Sink<any> | null = null; 199 200 const source: Source<any> = _sink => { 201 sink = _sink; 202 sink( 203 start(signal => { 204 if (signal === TalkbackKind.Pull) pulls++; 205 }) 206 ); 207 }; 208 209 const fn = jest.fn(); 210 const promise = sinks.toPromise(source).then(fn); 211 212 expect(pulls).toBe(1); 213 sink!(push(0)); 214 expect(pulls).toBe(2); 215 sink!(push(1)); 216 sink!(SignalKind.End); 217 expect(fn).not.toHaveBeenCalled(); 218 219 await promise; 220 expect(fn).toHaveBeenCalledWith(1); 221 }); 222 223 it('creates a Promise for synchronous sources', async () => { 224 const fn = jest.fn(); 225 await sinks.toPromise(sources.fromArray([1, 2, 3])).then(fn); 226 expect(fn).toHaveBeenCalledWith(3); 227 }); 228}); 229 230describe('toObservable', () => { 231 it('creates an Observable mirroring the Wonka source', () => { 232 const next = jest.fn(); 233 const complete = jest.fn(); 234 let pulls = 0; 235 let sink: Sink<any> | null = null; 236 237 const source: Source<any> = _sink => { 238 sink = _sink; 239 sink( 240 start(signal => { 241 if (signal === TalkbackKind.Pull) pulls++; 242 }) 243 ); 244 }; 245 246 Observable.from(observable.toObservable(source) as any).subscribe({ 247 next, 248 complete, 249 }); 250 251 expect(pulls).toBe(1); 252 sink!(push(0)); 253 expect(next).toHaveBeenCalledWith(0); 254 sink!(push(1)); 255 expect(next).toHaveBeenCalledWith(1); 256 sink!(SignalKind.End); 257 expect(complete).toHaveBeenCalled(); 258 }); 259 260 it('forwards cancellations from the Observable as a talkback', () => { 261 let ending = 0; 262 const source: Source<any> = sink => 263 sink( 264 start(signal => { 265 if (signal === TalkbackKind.Close) ending++; 266 }) 267 ); 268 269 const sub = Observable.from(observable.toObservable(source) as any).subscribe({}); 270 271 expect(ending).toBe(0); 272 sub.unsubscribe(); 273 expect(ending).toBe(1); 274 }); 275}); 276 277describe('toCallbag', () => { 278 it('creates a Callbag mirroring the Wonka source', () => { 279 const fn = jest.fn(); 280 let pulls = 0; 281 let sink: Sink<any> | null = null; 282 283 const source: Source<any> = _sink => { 284 sink = _sink; 285 sink( 286 start(signal => { 287 if (signal === TalkbackKind.Pull) pulls++; 288 }) 289 ); 290 }; 291 292 callbagIterate(fn)(callbag.toCallbag(source)); 293 294 expect(pulls).toBe(1); 295 sink!(push(0)); 296 expect(fn).toHaveBeenCalledWith(0); 297 sink!(push(1)); 298 expect(fn).toHaveBeenCalledWith(1); 299 sink!(SignalKind.End); 300 }); 301 302 it('forwards cancellations from the Callbag as a talkback', () => { 303 let ending = 0; 304 const fn = jest.fn(); 305 306 const source: Source<any> = sink => 307 sink( 308 start(signal => { 309 if (signal === TalkbackKind.Pull) { 310 sink(push(0)); 311 } else { 312 ending++; 313 } 314 }) 315 ); 316 317 callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any)); 318 319 expect(fn.mock.calls).toEqual([[0]]); 320 expect(ending).toBe(1); 321 }); 322});