Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { it, expect, vi } from 'vitest'; 2 3import { Source, Sink, Operator, Signal, SignalKind, TalkbackKind, TalkbackFn } from '../types'; 4import { push, start } from '../helpers'; 5 6/* This tests a noop operator for passive Pull talkback signals. 7 A Pull will be sent from the sink upwards and should pass through 8 the operator until the source receives it, which then pushes a 9 value down. */ 10export const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => { 11 it('responds to Pull talkback signals (spec)', () => { 12 let talkback: TalkbackFn | null = null; 13 let pushes = 0; 14 const values: any[] = []; 15 16 const source: Source<any> = sink => { 17 sink( 18 start(signal => { 19 if (!pushes && signal === TalkbackKind.Pull) { 20 pushes++; 21 sink(push(0)); 22 } 23 }) 24 ); 25 }; 26 27 const sink: Sink<any> = signal => { 28 expect(signal).not.toBe(SignalKind.End); 29 if (signal === SignalKind.End) { 30 /*noop*/ 31 } else if (signal.tag === SignalKind.Push) { 32 values.push(signal[0]); 33 } else { 34 talkback = signal[0]; 35 } 36 }; 37 38 operator(source)(sink); 39 // The Start signal should always come in immediately 40 expect(talkback).not.toBe(null); 41 // No Push signals should be issued initially 42 expect(values).toEqual([]); 43 44 // When pulling a value we expect an immediate response 45 talkback!(TalkbackKind.Pull); 46 vi.runAllTimers(); 47 expect(values).toEqual([output]); 48 }); 49}; 50 51/* This tests a noop operator for regular, active Push signals. 52 A Push will be sent downwards from the source, through the 53 operator to the sink. Pull events should be let through from 54 the sink after every Push event. */ 55export const passesActivePush = (operator: Operator<any, any>, result: any = 0) => { 56 it('responds to eager Push signals (spec)', () => { 57 const values: any[] = []; 58 let talkback: TalkbackFn | null = null; 59 let sink: Sink<any> | null = null; 60 let pulls = 0; 61 62 const source: Source<any> = _sink => { 63 (sink = _sink)( 64 start(signal => { 65 if (signal === TalkbackKind.Pull) pulls++; 66 }) 67 ); 68 }; 69 70 operator(source)(signal => { 71 expect(signal).not.toBe(SignalKind.End); 72 if (signal === SignalKind.End) { 73 /*noop*/ 74 } else if (signal.tag === SignalKind.Start) { 75 talkback = signal[0]; 76 } else if (signal.tag === SignalKind.Push) { 77 values.push(signal[0]); 78 talkback!(TalkbackKind.Pull); 79 } 80 }); 81 82 // No Pull signals should be issued initially 83 expect(pulls).toBe(0); 84 85 // When pushing a value we expect an immediate response 86 sink!(push(0)); 87 vi.runAllTimers(); 88 expect(values).toEqual([result]); 89 // Subsequently the Pull signal should have travelled upwards 90 expect(pulls).toBe(1); 91 }); 92}; 93 94/* This tests a noop operator for Close talkback signals from the sink. 95 A Close signal will be sent, which should be forwarded to the source, 96 which then ends the communication without sending an End signal. */ 97export const passesSinkClose = (operator: Operator<any, any>) => { 98 it('responds to Close signals from sink (spec)', () => { 99 let talkback: TalkbackFn | null = null; 100 let closing = 0; 101 102 const source: Source<any> = sink => { 103 sink( 104 start(signal => { 105 if (signal === TalkbackKind.Pull && !closing) { 106 sink(push(0)); 107 } else if (signal === TalkbackKind.Close) { 108 closing++; 109 } 110 }) 111 ); 112 }; 113 114 const sink: Sink<any> = signal => { 115 expect(signal).not.toBe(SignalKind.End); 116 if (signal === SignalKind.End) { 117 /*noop*/ 118 } else if (signal.tag === SignalKind.Push) { 119 talkback!(TalkbackKind.Close); 120 } else { 121 talkback = signal[0]; 122 } 123 }; 124 125 operator(source)(sink); 126 127 // When pushing a value we expect an immediate close signal 128 talkback!(TalkbackKind.Pull); 129 vi.runAllTimers(); 130 expect(closing).toBe(1); 131 }); 132}; 133 134/* This tests a noop operator for End signals from the source. 135 A Push and End signal will be sent after the first Pull talkback 136 signal from the sink, which shouldn't lead to any extra Close or Pull 137 talkback signals. */ 138export const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => { 139 it('passes on immediate Push then End signals from source (spec)', () => { 140 const signals: Signal<any>[] = []; 141 let talkback: TalkbackFn | null = null; 142 let pulls = 0; 143 let ending = 0; 144 145 const source: Source<any> = sink => { 146 sink( 147 start(signal => { 148 expect(signal).not.toBe(TalkbackKind.Close); 149 if (signal === TalkbackKind.Pull) { 150 pulls++; 151 if (pulls === 1) { 152 sink(push(0)); 153 sink(SignalKind.End); 154 } 155 } 156 }) 157 ); 158 }; 159 160 const sink: Sink<any> = signal => { 161 if (signal === SignalKind.End) { 162 signals.push(signal); 163 ending++; 164 } else if (signal.tag === SignalKind.Push) { 165 signals.push(signal); 166 } else { 167 talkback = signal[0]; 168 } 169 }; 170 171 operator(source)(sink); 172 173 // When pushing a value we expect an immediate Push then End signal 174 talkback!(TalkbackKind.Pull); 175 vi.runAllTimers(); 176 expect(ending).toBe(1); 177 expect(signals).toEqual([push(result), SignalKind.End]); 178 // Also no additional pull event should be created by the operator 179 expect(pulls).toBe(1); 180 }); 181}; 182 183/* This tests a noop operator for End signals from the source 184 after the first pull in response to another. 185 This is similar to passesSourceEnd but more well behaved since 186 mergeMap/switchMap/concatMap are eager operators. */ 187export const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => { 188 it('passes on End signals from source (spec)', () => { 189 const signals: Signal<any>[] = []; 190 let talkback: TalkbackFn | null = null; 191 let pulls = 0; 192 let ending = 0; 193 194 const source: Source<any> = sink => { 195 sink( 196 start(signal => { 197 expect(signal).not.toBe(TalkbackKind.Close); 198 if (signal === TalkbackKind.Pull) { 199 pulls++; 200 if (pulls <= 2) { 201 sink(push(0)); 202 } else { 203 sink(SignalKind.End); 204 } 205 } 206 }) 207 ); 208 }; 209 210 const sink: Sink<any> = signal => { 211 if (signal === SignalKind.End) { 212 signals.push(signal); 213 ending++; 214 } else if (signal.tag === SignalKind.Push) { 215 signals.push(signal); 216 talkback!(TalkbackKind.Pull); 217 } else { 218 talkback = signal[0]; 219 } 220 }; 221 222 operator(source)(sink); 223 224 // When pushing a value we expect an immediate Push then End signal 225 talkback!(TalkbackKind.Pull); 226 vi.runAllTimers(); 227 expect(ending).toBe(1); 228 expect(pulls).toBe(3); 229 expect(signals).toEqual([push(result), push(result), SignalKind.End]); 230 }); 231}; 232 233/* This tests a noop operator for Start signals from the source. 234 When the operator's sink is started by the source it'll receive 235 a Start event. As a response it should never send more than one 236 Start signals to the sink. */ 237export const passesSingleStart = (operator: Operator<any, any>) => { 238 it('sends a single Start event to the incoming sink (spec)', () => { 239 let starts = 0; 240 241 const source: Source<any> = sink => { 242 sink(start(() => {})); 243 }; 244 245 const sink: Sink<any> = signal => { 246 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { 247 starts++; 248 } 249 }; 250 251 // When starting the operator we expect a single start event on the sink 252 operator(source)(sink); 253 expect(starts).toBe(1); 254 }); 255}; 256 257/* This tests a noop operator for silence after End signals from the source. 258 When the operator receives the End signal it shouldn't forward any other 259 signals to the sink anymore. 260 This isn't a strict requirement, but some operators should ensure that 261 all sources are well behaved. This is particularly true for operators 262 that either Close sources themselves or may operate on multiple sources. */ 263export const passesStrictEnd = (operator: Operator<any, any>) => { 264 it('stops all signals after End has been received (spec: strict end)', () => { 265 let pulls = 0; 266 const signals: Signal<any>[] = []; 267 268 const source: Source<any> = sink => { 269 sink( 270 start(signal => { 271 if (signal === TalkbackKind.Pull) { 272 pulls++; 273 sink(SignalKind.End); 274 sink(push(123)); 275 } 276 }) 277 ); 278 }; 279 280 const sink: Sink<any> = signal => { 281 if (signal === SignalKind.End) { 282 signals.push(signal); 283 } else if (signal.tag === SignalKind.Push) { 284 signals.push(signal); 285 } else { 286 signal[0](TalkbackKind.Pull); 287 } 288 }; 289 290 operator(source)(sink); 291 292 // The Push signal should've been dropped 293 vi.runAllTimers(); 294 expect(signals).toEqual([SignalKind.End]); 295 expect(pulls).toBe(1); 296 }); 297 298 it('stops all signals after Close has been received (spec: strict close)', () => { 299 const signals: Signal<any>[] = []; 300 301 const source: Source<any> = sink => { 302 sink( 303 start(signal => { 304 if (signal === TalkbackKind.Close) { 305 sink(push(123)); 306 } 307 }) 308 ); 309 }; 310 311 const sink: Sink<any> = signal => { 312 if (signal === SignalKind.End) { 313 signals.push(signal); 314 } else if (signal.tag === SignalKind.Push) { 315 signals.push(signal); 316 } else { 317 signal[0](TalkbackKind.Close); 318 } 319 }; 320 321 operator(source)(sink); 322 323 // The Push signal should've been dropped 324 vi.runAllTimers(); 325 expect(signals).toEqual([]); 326 }); 327}; 328 329/* This tests an immediately closing operator for End signals to 330 the sink and Close signals to the source. 331 When an operator closes immediately we expect to see a Close 332 signal at the source and an End signal to the sink, since the 333 closing operator is expected to end the entire chain. */ 334export const passesCloseAndEnd = (closingOperator: Operator<any, any>) => { 335 it('closes the source and ends the sink correctly (spec: ending operator)', () => { 336 let closing = 0; 337 let ending = 0; 338 339 const source: Source<any> = sink => { 340 sink( 341 start(signal => { 342 // For some operator tests we do need to send a single value 343 if (signal === TalkbackKind.Pull) { 344 sink(push(null)); 345 } else { 346 closing++; 347 } 348 }) 349 ); 350 }; 351 352 const sink: Sink<any> = signal => { 353 if (signal === SignalKind.End) { 354 ending++; 355 } else if (signal.tag === SignalKind.Start) { 356 signal[0](TalkbackKind.Pull); 357 } 358 }; 359 360 // We expect the operator to immediately end and close 361 closingOperator(source)(sink); 362 expect(closing).toBe(1); 363 expect(ending).toBe(1); 364 }); 365}; 366 367export const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => { 368 it('passes an async push with an async end (spec)', () => { 369 let hasPushed = false; 370 const signals: Signal<any>[] = []; 371 372 const source: Source<any> = sink => { 373 sink( 374 start(signal => { 375 if (signal === TalkbackKind.Pull && !hasPushed) { 376 hasPushed = true; 377 setTimeout(() => sink(push(0)), 10); 378 setTimeout(() => sink(SignalKind.End), 20); 379 } 380 }) 381 ); 382 }; 383 384 const sink: Sink<any> = signal => { 385 if (signal === SignalKind.End) { 386 signals.push(signal); 387 } else if (signal.tag === SignalKind.Push) { 388 signals.push(signal); 389 } else { 390 setTimeout(() => { 391 signal[0](TalkbackKind.Pull); 392 }, 5); 393 } 394 }; 395 396 // We initially expect to see the push signal 397 // Afterwards after all timers all other signals come in 398 operator(source)(sink); 399 expect(signals.length).toBe(0); 400 vi.advanceTimersByTime(5); 401 expect(hasPushed).toBeTruthy(); 402 vi.runAllTimers(); 403 404 expect(signals).toEqual([push(result), SignalKind.End]); 405 }); 406};