Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.1.0 12 kB view raw
1import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 2import { push, start } from '../helpers'; 3 4/* This tests a noop operator for passive Pull talkback signals. 5 A Pull will be sent from the sink upwards and should pass through 6 the operator until the source receives it, which then pushes a 7 value down. */ 8export const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => { 9 it('responds to Pull talkback signals (spec)', () => { 10 let talkback: TalkbackFn | null = null; 11 let pushes = 0; 12 const values: any[] = []; 13 14 const source: Source<any> = sink => { 15 sink( 16 start(signal => { 17 if (!pushes && signal === TalkbackKind.Pull) { 18 pushes++; 19 sink(push(0)); 20 } 21 }) 22 ); 23 }; 24 25 const sink: Sink<any> = signal => { 26 expect(signal).not.toBe(SignalKind.End); 27 if (signal === SignalKind.End) { 28 /*noop*/ 29 } else if (signal.tag === SignalKind.Push) { 30 values.push(signal[0]); 31 } else { 32 talkback = signal[0]; 33 } 34 }; 35 36 operator(source)(sink); 37 // The Start signal should always come in immediately 38 expect(talkback).not.toBe(null); 39 // No Push signals should be issued initially 40 expect(values).toEqual([]); 41 42 // When pulling a value we expect an immediate response 43 talkback!(TalkbackKind.Pull); 44 jest.runAllTimers(); 45 expect(values).toEqual([output]); 46 }); 47}; 48 49/* This tests a noop operator for regular, active Push signals. 50 A Push will be sent downwards from the source, through the 51 operator to the sink. Pull events should be let through from 52 the sink after every Push event. */ 53export const passesActivePush = (operator: Operator<any, any>, result: any = 0) => { 54 it('responds to eager Push signals (spec)', () => { 55 const values: any[] = []; 56 let talkback: TalkbackFn | null = null; 57 let sink: Sink<any> | null = null; 58 let pulls = 0; 59 60 const source: Source<any> = _sink => { 61 (sink = _sink)( 62 start(signal => { 63 if (signal === TalkbackKind.Pull) pulls++; 64 }) 65 ); 66 }; 67 68 operator(source)(signal => { 69 expect(signal).not.toBe(SignalKind.End); 70 if (signal === SignalKind.End) { 71 /*noop*/ 72 } else if (signal.tag === SignalKind.Start) { 73 talkback = signal[0]; 74 } else if (signal.tag === SignalKind.Push) { 75 values.push(signal[0]); 76 talkback!(TalkbackKind.Pull); 77 } 78 }); 79 80 // No Pull signals should be issued initially 81 expect(pulls).toBe(0); 82 83 // When pushing a value we expect an immediate response 84 sink!(push(0)); 85 jest.runAllTimers(); 86 expect(values).toEqual([result]); 87 // Subsequently the Pull signal should have travelled upwards 88 expect(pulls).toBe(1); 89 }); 90}; 91 92/* This tests a noop operator for Close talkback signals from the sink. 93 A Close signal will be sent, which should be forwarded to the source, 94 which then ends the communication without sending an End signal. */ 95export const passesSinkClose = (operator: Operator<any, any>) => { 96 it('responds to Close signals from sink (spec)', () => { 97 let talkback: TalkbackFn | null = null; 98 let closing = 0; 99 100 const source: Source<any> = sink => { 101 sink( 102 start(signal => { 103 if (signal === TalkbackKind.Pull && !closing) { 104 sink(push(0)); 105 } else if (signal === TalkbackKind.Close) { 106 closing++; 107 } 108 }) 109 ); 110 }; 111 112 const sink: Sink<any> = signal => { 113 expect(signal).not.toBe(SignalKind.End); 114 if (signal === SignalKind.End) { 115 /*noop*/ 116 } else if (signal.tag === SignalKind.Push) { 117 talkback!(TalkbackKind.Close); 118 } else { 119 talkback = signal[0]; 120 } 121 }; 122 123 operator(source)(sink); 124 125 // When pushing a value we expect an immediate close signal 126 talkback!(TalkbackKind.Pull); 127 jest.runAllTimers(); 128 expect(closing).toBe(1); 129 }); 130}; 131 132/* This tests a noop operator for End signals from the source. 133 A Push and End signal will be sent after the first Pull talkback 134 signal from the sink, which shouldn't lead to any extra Close or Pull 135 talkback signals. */ 136export const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => { 137 it('passes on immediate Push then End signals from source (spec)', () => { 138 const signals: Signal<any>[] = []; 139 let talkback: TalkbackFn | null = null; 140 let pulls = 0; 141 let ending = 0; 142 143 const source: Source<any> = sink => { 144 sink( 145 start(signal => { 146 expect(signal).not.toBe(TalkbackKind.Close); 147 if (signal === TalkbackKind.Pull) { 148 pulls++; 149 if (pulls === 1) { 150 sink(push(0)); 151 sink(SignalKind.End); 152 } 153 } 154 }) 155 ); 156 }; 157 158 const sink: Sink<any> = signal => { 159 if (signal === SignalKind.End) { 160 signals.push(signal); 161 ending++; 162 } else if (signal.tag === SignalKind.Push) { 163 signals.push(signal); 164 } else { 165 talkback = signal[0]; 166 } 167 }; 168 169 operator(source)(sink); 170 171 // When pushing a value we expect an immediate Push then End signal 172 talkback!(TalkbackKind.Pull); 173 jest.runAllTimers(); 174 expect(ending).toBe(1); 175 expect(signals).toEqual([push(result), SignalKind.End]); 176 // Also no additional pull event should be created by the operator 177 expect(pulls).toBe(1); 178 }); 179}; 180 181/* This tests a noop operator for End signals from the source 182 after the first pull in response to another. 183 This is similar to passesSourceEnd but more well behaved since 184 mergeMap/switchMap/concatMap are eager operators. */ 185export const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => { 186 it('passes on End signals from source (spec)', () => { 187 const signals: Signal<any>[] = []; 188 let talkback: TalkbackFn | null = null; 189 let pulls = 0; 190 let ending = 0; 191 192 const source: Source<any> = sink => { 193 sink( 194 start(signal => { 195 expect(signal).not.toBe(TalkbackKind.Close); 196 if (signal === TalkbackKind.Pull) { 197 pulls++; 198 if (pulls <= 2) { 199 sink(push(0)); 200 } else { 201 sink(SignalKind.End); 202 } 203 } 204 }) 205 ); 206 }; 207 208 const sink: Sink<any> = signal => { 209 if (signal === SignalKind.End) { 210 signals.push(signal); 211 ending++; 212 } else if (signal.tag === SignalKind.Push) { 213 signals.push(signal); 214 talkback!(TalkbackKind.Pull); 215 } else { 216 talkback = signal[0]; 217 } 218 }; 219 220 operator(source)(sink); 221 222 // When pushing a value we expect an immediate Push then End signal 223 talkback!(TalkbackKind.Pull); 224 jest.runAllTimers(); 225 expect(ending).toBe(1); 226 expect(pulls).toBe(3); 227 expect(signals).toEqual([push(result), push(result), SignalKind.End]); 228 }); 229}; 230 231/* This tests a noop operator for Start signals from the source. 232 When the operator's sink is started by the source it'll receive 233 a Start event. As a response it should never send more than one 234 Start signals to the sink. */ 235export const passesSingleStart = (operator: Operator<any, any>) => { 236 it('sends a single Start event to the incoming sink (spec)', () => { 237 let starts = 0; 238 239 const source: Source<any> = sink => { 240 sink(start(() => {})); 241 }; 242 243 const sink: Sink<any> = signal => { 244 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 245 starts++; 246 } 247 }; 248 249 // When starting the operator we expect a single start event on the sink 250 operator(source)(sink); 251 expect(starts).toBe(1); 252 }); 253}; 254 255/* This tests a noop operator for silence after End signals from the source. 256 When the operator receives the End signal it shouldn't forward any other 257 signals to the sink anymore. 258 This isn't a strict requirement, but some operators should ensure that 259 all sources are well behaved. This is particularly true for operators 260 that either Close sources themselves or may operate on multiple sources. */ 261export const passesStrictEnd = (operator: Operator<any, any>) => { 262 it('stops all signals after End has been received (spec: strict end)', () => { 263 let pulls = 0; 264 const signals: Signal<any>[] = []; 265 266 const source: Source<any> = sink => { 267 sink( 268 start(signal => { 269 if (signal === TalkbackKind.Pull) { 270 pulls++; 271 sink(SignalKind.End); 272 sink(push(123)); 273 } 274 }) 275 ); 276 }; 277 278 const sink: Sink<any> = signal => { 279 if (signal === SignalKind.End) { 280 signals.push(signal); 281 } else if (signal.tag === SignalKind.Push) { 282 signals.push(signal); 283 } else { 284 signal[0](TalkbackKind.Pull); 285 } 286 }; 287 288 operator(source)(sink); 289 290 // The Push signal should've been dropped 291 jest.runAllTimers(); 292 expect(signals).toEqual([SignalKind.End]); 293 expect(pulls).toBe(1); 294 }); 295 296 it('stops all signals after Close has been received (spec: strict close)', () => { 297 const signals: Signal<any>[] = []; 298 299 const source: Source<any> = sink => { 300 sink( 301 start(signal => { 302 if (signal === TalkbackKind.Close) { 303 sink(push(123)); 304 } 305 }) 306 ); 307 }; 308 309 const sink: Sink<any> = signal => { 310 if (signal === SignalKind.End) { 311 signals.push(signal); 312 } else if (signal.tag === SignalKind.Push) { 313 signals.push(signal); 314 } else { 315 signal[0](TalkbackKind.Close); 316 } 317 }; 318 319 operator(source)(sink); 320 321 // The Push signal should've been dropped 322 jest.runAllTimers(); 323 expect(signals).toEqual([]); 324 }); 325}; 326 327/* This tests an immediately closing operator for End signals to 328 the sink and Close signals to the source. 329 When an operator closes immediately we expect to see a Close 330 signal at the source and an End signal to the sink, since the 331 closing operator is expected to end the entire chain. */ 332export const passesCloseAndEnd = (closingOperator: Operator<any, any>) => { 333 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 334 let closing = 0; 335 let ending = 0; 336 337 const source: Source<any> = sink => { 338 sink( 339 start(signal => { 340 // For some operator tests we do need to send a single value 341 if (signal === TalkbackKind.Pull) { 342 sink(push(null)); 343 } else { 344 closing++; 345 } 346 }) 347 ); 348 }; 349 350 const sink: Sink<any> = signal => { 351 if (signal === SignalKind.End) { 352 ending++; 353 } else if (signal.tag === SignalKind.Start) { 354 signal[0](TalkbackKind.Pull); 355 } 356 }; 357 358 // We expect the operator to immediately end and close 359 closingOperator(source)(sink); 360 expect(closing).toBe(1); 361 expect(ending).toBe(1); 362 }); 363}; 364 365export const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => { 366 it('passes an async push with an async end (spec)', () => { 367 let hasPushed = false; 368 const signals: Signal<any>[] = []; 369 370 const source: Source<any> = sink => { 371 sink( 372 start(signal => { 373 if (signal === TalkbackKind.Pull && !hasPushed) { 374 hasPushed = true; 375 setTimeout(() => sink(push(0)), 10); 376 setTimeout(() => sink(SignalKind.End), 20); 377 } 378 }) 379 ); 380 }; 381 382 const sink: Sink<any> = signal => { 383 if (signal === SignalKind.End) { 384 signals.push(signal); 385 } else if (signal.tag === SignalKind.Push) { 386 signals.push(signal); 387 } else { 388 setTimeout(() => { 389 signal[0](TalkbackKind.Pull); 390 }, 5); 391 } 392 }; 393 394 // We initially expect to see the push signal 395 // Afterwards after all timers all other signals come in 396 operator(source)(sink); 397 expect(signals.length).toBe(0); 398 jest.advanceTimersByTime(5); 399 expect(hasPushed).toBeTruthy(); 400 jest.runAllTimers(); 401 402 expect(signals).toEqual([push(result), SignalKind.End]); 403 }); 404};