Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.3 7.4 kB view raw
1import * as deriving from './helpers/wonka_deriving'; 2import * as sinks from './wonka_sinks.gen'; 3import * as sources from './wonka_sources.gen'; 4import * as web from './web/wonkaJs.gen'; 5import * as types from './wonka_types.gen'; 6 7import Observable from 'zen-observable'; 8import callbagIterate from 'callbag-iterate'; 9import callbagTake from 'callbag-take'; 10 11describe('subscribe', () => { 12 it('sends Pull talkback signals every Push signal', () => { 13 let pulls = 0; 14 const fn = jest.fn(); 15 16 const source: types.sourceT<any> = sink => { 17 sink(deriving.start(tb => { 18 if (tb === deriving.pull) { 19 if (pulls < 3) { 20 pulls++; 21 sink(deriving.push(0)); 22 } else { 23 sink(deriving.end()); 24 expect(pulls).toBe(3); 25 } 26 } 27 })); 28 }; 29 30 sinks.subscribe(fn)(source); 31 expect(fn).toHaveBeenCalledTimes(3); 32 expect(pulls).toBe(3); 33 }); 34 35 it('cancels when unsubscribe is called', () => { 36 let pulls = 0; 37 let closing = 0; 38 39 const source: types.sourceT<any> = sink => { 40 sink(deriving.start(tb => { 41 if (tb === deriving.pull) { 42 if (!pulls) { 43 pulls++; 44 sink(deriving.push(0)); 45 } 46 } else if (tb === deriving.close) { 47 closing++; 48 } 49 })); 50 }; 51 52 const sub = sinks.subscribe(() => {})(source); 53 expect(pulls).toBe(1); 54 55 sub.unsubscribe(); 56 expect(closing).toBe(1); 57 }); 58 59 it('ignores cancellation when the source has already ended', () => { 60 let pulls = 0; 61 let closing = 0; 62 63 const source: types.sourceT<any> = sink => { 64 sink(deriving.start(tb => { 65 if (tb === deriving.pull) { 66 pulls++; 67 sink(deriving.end()); 68 } else if (tb === deriving.close) { 69 closing++; 70 } 71 })); 72 }; 73 74 const sub = sinks.subscribe(() => {})(source); 75 expect(pulls).toBe(1); 76 sub.unsubscribe(); 77 expect(closing).toBe(0); 78 }); 79 80 it('ignores Push signals after the source has ended', () => { 81 const fn = jest.fn(); 82 const source: types.sourceT<any> = sink => { 83 sink(deriving.start(tb => { 84 if (tb === deriving.pull) { 85 sink(deriving.end()); 86 sink(deriving.push(0)); 87 } 88 })); 89 }; 90 91 sinks.subscribe(fn)(source); 92 expect(fn).not.toHaveBeenCalled(); 93 }); 94 95 it('ignores Push signals after cancellation', () => { 96 const fn = jest.fn(); 97 const source: types.sourceT<any> = sink => { 98 sink(deriving.start(tb => { 99 if (tb === deriving.close) { 100 sink(deriving.push(0)); 101 } 102 })); 103 }; 104 105 sinks.subscribe(fn)(source).unsubscribe(); 106 expect(fn).not.toHaveBeenCalled(); 107 }); 108}); 109 110describe('publish', () => { 111 it('sends Pull talkback signals every Push signal', () => { 112 let pulls = 0; 113 const source: types.sourceT<any> = sink => { 114 sink(deriving.start(tb => { 115 if (tb === deriving.pull) { 116 if (pulls < 3) { 117 pulls++; 118 sink(deriving.push(0)); 119 } else { 120 sink(deriving.end()); 121 expect(pulls).toBe(3); 122 } 123 } 124 })); 125 }; 126 127 sinks.publish(source); 128 expect(pulls).toBe(3); 129 }); 130}); 131 132describe('toArray', () => { 133 it('sends Pull talkback signals every Push signal', () => { 134 let pulls = 0; 135 const source: types.sourceT<any> = sink => { 136 sink(deriving.start(tb => { 137 if (tb === deriving.pull) { 138 if (pulls < 3) { 139 pulls++; 140 sink(deriving.push(0)); 141 } else { 142 sink(deriving.end()); 143 expect(pulls).toBe(3); 144 } 145 } 146 })); 147 }; 148 149 const array = sinks.toArray(source); 150 expect(array).toEqual([0, 0, 0]); 151 expect(pulls).toBe(3); 152 }); 153 154 it('sends a Close talkback signal after all synchronous values have been pulled', () => { 155 let pulls = 0; 156 let ending = 0; 157 158 const source: types.sourceT<any> = sink => { 159 sink(deriving.start(tb => { 160 if (tb === deriving.pull) { 161 if (!pulls) { 162 pulls++; 163 sink(deriving.push(0)); 164 } 165 } else if (tb === deriving.close) { 166 ending++; 167 } 168 })); 169 }; 170 171 const array = sinks.toArray(source); 172 expect(array).toEqual([0]); 173 expect(ending).toBe(1); 174 }); 175}); 176 177describe('toPromise', () => { 178 it('creates a Promise that resolves on the last value', async () => { 179 let pulls = 0; 180 let sink = null; 181 182 const source: types.sourceT<any> = _sink => { 183 sink = _sink; 184 sink(deriving.start(tb => { 185 if (tb === deriving.pull) 186 pulls++; 187 })); 188 }; 189 190 const fn = jest.fn(); 191 const promise = web.toPromise(source).then(fn); 192 193 expect(pulls).toBe(1); 194 sink(deriving.push(0)); 195 expect(pulls).toBe(2); 196 sink(deriving.push(1)); 197 sink(deriving.end()); 198 expect(fn).not.toHaveBeenCalled(); 199 200 await promise; 201 expect(fn).toHaveBeenCalledWith(1); 202 }); 203 204 it('creates a Promise for synchronous sources', async () => { 205 const fn = jest.fn(); 206 await web.toPromise(sources.fromArray([1, 2, 3])).then(fn); 207 expect(fn).toHaveBeenCalledWith(3); 208 }); 209}); 210 211describe('toObservable', () => { 212 it('creates an Observable mirroring the Wonka source', () => { 213 const next = jest.fn(); 214 const complete = jest.fn(); 215 let pulls = 0; 216 let sink = null; 217 218 const source: types.sourceT<any> = _sink => { 219 sink = _sink; 220 sink(deriving.start(tb => { 221 if (tb === deriving.pull) 222 pulls++; 223 })); 224 }; 225 226 Observable.from(web.toObservable(source) as any).subscribe({ 227 next, 228 complete, 229 }); 230 231 expect(pulls).toBe(1); 232 sink(deriving.push(0)); 233 expect(next).toHaveBeenCalledWith(0); 234 sink(deriving.push(1)); 235 expect(next).toHaveBeenCalledWith(1); 236 sink(deriving.end()); 237 expect(complete).toHaveBeenCalled(); 238 }); 239 240 it('forwards cancellations from the Observable as a talkback', () => { 241 let ending = 0; 242 const source: types.sourceT<any> = sink => 243 sink(deriving.start(tb => { 244 if (tb === deriving.close) 245 ending++; 246 })); 247 248 const sub = Observable.from(web.toObservable(source) as any).subscribe({}); 249 250 expect(ending).toBe(0); 251 sub.unsubscribe(); 252 expect(ending).toBe(1); 253 }); 254}); 255 256describe('toCallbag', () => { 257 it('creates a Callbag mirroring the Wonka source', () => { 258 const fn = jest.fn(); 259 let pulls = 0; 260 let sink = null; 261 262 const source: types.sourceT<any> = _sink => { 263 sink = _sink; 264 sink(deriving.start(tb => { 265 if (tb === deriving.pull) 266 pulls++; 267 })); 268 }; 269 270 callbagIterate(fn)(web.toCallbag(source)); 271 272 expect(pulls).toBe(1); 273 sink(deriving.push(0)); 274 expect(fn).toHaveBeenCalledWith(0); 275 sink(deriving.push(1)); 276 expect(fn).toHaveBeenCalledWith(1); 277 sink(deriving.end()); 278 }); 279 280 it('forwards cancellations from the Callbag as a talkback', () => { 281 let ending = 0; 282 const fn = jest.fn(); 283 284 const source: types.sourceT<any> = sink => 285 sink(deriving.start(tb => { 286 if (tb === deriving.pull) 287 sink(deriving.push(0)); 288 if (tb === deriving.close) 289 ending++; 290 })); 291 292 callbagIterate(fn)(callbagTake(1)(web.toCallbag(source) as any)); 293 294 expect(fn.mock.calls).toEqual([[0]]); 295 expect(ending).toBe(1); 296 }); 297});