Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v4.0.0-rc.2 7.1 kB view raw
1import * as deriving from './helpers/wonka_deriving'; 2import * as sinks from './wonka_sinks.gen'; 3import * as web from './web/wonkaJs.gen'; 4import * as types from './wonka_types.gen'; 5 6import Observable from 'zen-observable'; 7import callbagIterate from 'callbag-iterate'; 8import callbagTake from 'callbag-take'; 9 10describe('subscribe', () => { 11 it('sends Pull talkback signals every Push signal', () => { 12 let pulls = 0; 13 const fn = jest.fn(); 14 15 const source: types.sourceT<any> = sink => { 16 sink(deriving.start(tb => { 17 if (tb === deriving.pull) { 18 if (pulls < 3) { 19 pulls++; 20 sink(deriving.push(0)); 21 } else { 22 sink(deriving.end()); 23 expect(pulls).toBe(3); 24 } 25 } 26 })); 27 }; 28 29 sinks.subscribe(fn)(source); 30 expect(fn).toHaveBeenCalledTimes(3); 31 expect(pulls).toBe(3); 32 }); 33 34 it('cancels when unsubscribe is called', () => { 35 let pulls = 0; 36 let closing = 0; 37 38 const source: types.sourceT<any> = sink => { 39 sink(deriving.start(tb => { 40 if (tb === deriving.pull) { 41 if (!pulls) { 42 pulls++; 43 sink(deriving.push(0)); 44 } 45 } else if (tb === deriving.close) { 46 closing++; 47 } 48 })); 49 }; 50 51 const sub = sinks.subscribe(() => {})(source); 52 expect(pulls).toBe(1); 53 54 sub.unsubscribe(); 55 expect(closing).toBe(1); 56 }); 57 58 it('ignores cancellation when the source has already ended', () => { 59 let pulls = 0; 60 let closing = 0; 61 62 const source: types.sourceT<any> = sink => { 63 sink(deriving.start(tb => { 64 if (tb === deriving.pull) { 65 pulls++; 66 sink(deriving.end()); 67 } else if (tb === deriving.close) { 68 closing++; 69 } 70 })); 71 }; 72 73 const sub = sinks.subscribe(() => {})(source); 74 expect(pulls).toBe(1); 75 sub.unsubscribe(); 76 expect(closing).toBe(0); 77 }); 78 79 it('ignores Push signals after the source has ended', () => { 80 const fn = jest.fn(); 81 const source: types.sourceT<any> = sink => { 82 sink(deriving.start(tb => { 83 if (tb === deriving.pull) { 84 sink(deriving.end()); 85 sink(deriving.push(0)); 86 } 87 })); 88 }; 89 90 sinks.subscribe(fn)(source); 91 expect(fn).not.toHaveBeenCalled(); 92 }); 93 94 it('ignores Push signals after cancellation', () => { 95 const fn = jest.fn(); 96 const source: types.sourceT<any> = sink => { 97 sink(deriving.start(tb => { 98 if (tb === deriving.close) { 99 sink(deriving.push(0)); 100 } 101 })); 102 }; 103 104 sinks.subscribe(fn)(source).unsubscribe(); 105 expect(fn).not.toHaveBeenCalled(); 106 }); 107}); 108 109describe('publish', () => { 110 it('sends Pull talkback signals every Push signal', () => { 111 let pulls = 0; 112 const source: types.sourceT<any> = sink => { 113 sink(deriving.start(tb => { 114 if (tb === deriving.pull) { 115 if (pulls < 3) { 116 pulls++; 117 sink(deriving.push(0)); 118 } else { 119 sink(deriving.end()); 120 expect(pulls).toBe(3); 121 } 122 } 123 })); 124 }; 125 126 sinks.publish(source); 127 expect(pulls).toBe(3); 128 }); 129}); 130 131describe('toArray', () => { 132 it('sends Pull talkback signals every Push signal', () => { 133 let pulls = 0; 134 const source: types.sourceT<any> = sink => { 135 sink(deriving.start(tb => { 136 if (tb === deriving.pull) { 137 if (pulls < 3) { 138 pulls++; 139 sink(deriving.push(0)); 140 } else { 141 sink(deriving.end()); 142 expect(pulls).toBe(3); 143 } 144 } 145 })); 146 }; 147 148 const array = sinks.toArray(source); 149 expect(array).toEqual([0, 0, 0]); 150 expect(pulls).toBe(3); 151 }); 152 153 it('sends a Close talkback signal after all synchronous values have been pulled', () => { 154 let pulls = 0; 155 let ending = 0; 156 157 const source: types.sourceT<any> = sink => { 158 sink(deriving.start(tb => { 159 if (tb === deriving.pull) { 160 if (!pulls) { 161 pulls++; 162 sink(deriving.push(0)); 163 } 164 } else if (tb === deriving.close) { 165 ending++; 166 } 167 })); 168 }; 169 170 const array = sinks.toArray(source); 171 expect(array).toEqual([0]); 172 expect(ending).toBe(1); 173 }); 174}); 175 176describe('toPromise', () => { 177 it('creates a Promise that resolves on the last value', async () => { 178 let pulls = 0; 179 let sink = null; 180 181 const source: types.sourceT<any> = _sink => { 182 sink = _sink; 183 sink(deriving.start(tb => { 184 if (tb === deriving.pull) 185 pulls++; 186 })); 187 }; 188 189 const fn = jest.fn(); 190 const promise = web.toPromise(source).then(fn); 191 192 expect(pulls).toBe(1); 193 sink(deriving.push(0)); 194 expect(pulls).toBe(2); 195 sink(deriving.push(1)); 196 sink(deriving.end()); 197 expect(fn).not.toHaveBeenCalled(); 198 199 await promise; 200 expect(fn).toHaveBeenCalledWith(1); 201 }); 202}); 203 204describe('toObservable', () => { 205 it('creates an Observable mirroring the Wonka source', () => { 206 const next = jest.fn(); 207 const complete = jest.fn(); 208 let pulls = 0; 209 let sink = null; 210 211 const source: types.sourceT<any> = _sink => { 212 sink = _sink; 213 sink(deriving.start(tb => { 214 if (tb === deriving.pull) 215 pulls++; 216 })); 217 }; 218 219 Observable.from(web.toObservable(source) as any).subscribe({ 220 next, 221 complete, 222 }); 223 224 expect(pulls).toBe(1); 225 sink(deriving.push(0)); 226 expect(next).toHaveBeenCalledWith(0); 227 sink(deriving.push(1)); 228 expect(next).toHaveBeenCalledWith(1); 229 sink(deriving.end()); 230 expect(complete).toHaveBeenCalled(); 231 }); 232 233 it('forwards cancellations from the Observable as a talkback', () => { 234 let ending = 0; 235 const source: types.sourceT<any> = sink => 236 sink(deriving.start(tb => { 237 if (tb === deriving.close) 238 ending++; 239 })); 240 241 const sub = Observable.from(web.toObservable(source) as any).subscribe({}); 242 243 expect(ending).toBe(0); 244 sub.unsubscribe(); 245 expect(ending).toBe(1); 246 }); 247}); 248 249describe('toCallbag', () => { 250 it('creates a Callbag mirroring the Wonka source', () => { 251 const fn = jest.fn(); 252 let pulls = 0; 253 let sink = null; 254 255 const source: types.sourceT<any> = _sink => { 256 sink = _sink; 257 sink(deriving.start(tb => { 258 if (tb === deriving.pull) 259 pulls++; 260 })); 261 }; 262 263 callbagIterate(fn)(web.toCallbag(source)); 264 265 expect(pulls).toBe(1); 266 sink(deriving.push(0)); 267 expect(fn).toHaveBeenCalledWith(0); 268 sink(deriving.push(1)); 269 expect(fn).toHaveBeenCalledWith(1); 270 sink(deriving.end()); 271 }); 272 273 it('forwards cancellations from the Callbag as a talkback', () => { 274 let ending = 0; 275 const fn = jest.fn(); 276 277 const source: types.sourceT<any> = sink => 278 sink(deriving.start(tb => { 279 if (tb === deriving.pull) 280 sink(deriving.push(0)); 281 if (tb === deriving.close) 282 ending++; 283 })); 284 285 callbagIterate(fn)(callbagTake(1)(web.toCallbag(source) as any)); 286 287 expect(fn.mock.calls).toEqual([[0]]); 288 expect(ending).toBe(1); 289 }); 290});