Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1--- 2title: Operators 3order: 1 4--- 5 6Operators in Wonka allow you to transform values from a source before they are sent to a sink. Wonka has the following operators. 7 8## buffer 9 10Buffers emissions from an outer source and emits a buffer array of items every time an 11inner source (notifier) emits. 12 13This operator can be used to group values into a arrays on a source. The emitted values will 14be sent when a notifier fires and will be arrays of all items before the notification event. 15 16In combination with `interval` this can be used to group values in chunks regularly. 17 18```typescript 19import { pipe, interval, buffer, take, subscribe } from 'wonka'; 20 21pipe( 22 interval(50), 23 buffer(interval(100)), 24 take(2), 25 subscribe((buffer) => { 26 buffer.forEach((x) => console.log(x)); 27 console.log(';'); 28 }) 29); // Prints 1 2; 2 3 to the console. 30``` 31 32## combine 33 34`combine` two sources together to a single source. The emitted value will be a combination of the two sources, with all values from the first source being emitted with the first value of the second source _before_ values of the second source are emitted. 35 36```typescript 37import { fromArray, pipe, combine, subscribe } from 'wonka'; 38 39const sourceOne = fromArray([1, 2, 3]); 40const sourceTwo = fromArray([4, 5, 6]); 41 42pipe( 43 combine(sourceOne, sourceTwo), 44 subscribe(([valOne, valTwo]) => { 45 console.log(valOne + valTwo); 46 }) 47); // Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console. 48``` 49 50## concat 51 52`concat` will combine two sources together, subscribing to the next source after the previous source completes. 53 54```typescript 55import { fromArray, pipe, concat, subscribe } from 'wonka'; 56 57const sourceOne = fromArray([1, 2, 3]); 58const sourceTwo = fromArray([6, 5, 4]); 59 60pipe( 61 concat([sourceOne, sourceTwo]), 62 subscribe((val) => console.log(val)) 63); // Prints 1 2 3 6 5 4 to the console. 64``` 65 66## concatAll 67 68`concatAll` will combine all sources emitted on an outer source together, subscribing to the 69next source after the previous source completes. 70 71It's very similar to `concat`, but instead accepts a source of sources as an input. 72 73```typescript 74import { pipe, fromArray, concatAll, subscribe } from 'wonka'; 75 76const sourceOne = fromArray([1, 2, 3]); 77const sourceTwo = fromArray([6, 5, 4]); 78 79pipe( 80 fromArray([sourceOne, sourceTwo]), 81 concatAll, 82 subscribe((val) => console.log(val)) 83); // Prints 1 2 3 6 5 4 to the console. 84``` 85 86## concatMap 87 88`concatMap` allows you to map values of an outer source to an inner source. The sink will not dispatch the `Pull` signal until the previous value has been emitted. This is in contrast to `mergeMap`, which will dispatch the `Pull` signal for new values even if the previous value has not yet been emitted. 89 90```typescript 91import { fromArray, pipe, concatMap, delay, fromValue, subscribe } from 'wonka'; 92 93const source = fromArray([1, 2, 3, 4, 5, 6]); 94 95pipe( 96 source, 97 concatMap((val) => { 98 return pipe(fromValue(val), delay(val * 1000)); 99 }), 100 subscribe((val) => console.log(val)) 101); 102``` 103 104## delay 105 106`delay` delays all emitted values of a source by the given amount of milliseconds. 107 108```typescript 109import { pipe, fromArray, delay, subscribe } from 'wonka'; 110 111pipe( 112 fromArray([1, 2]), 113 delay(10) 114 subscribe(val => console.log(val)) 115); 116// waits 10ms then prints 1, waits 10ms then prints 2, waits 10ms then ends 117``` 118 119## debounce 120 121`debounce` doesn't emit values of a source until no values have been emitted after 122a given amount of milliseconds. Once this threshold of silence has been reached, the 123last value that has been received will be emitted. 124 125```typescript 126import { pipe, interval, take, fromValue, concat, debounce, subscribe } from 'wonka'; 127 128const sourceA = pipe(interval(10), take(5)); 129const sourceB = fromValue(1); 130 131pipe( 132 concat([sourceA, sourceB]) 133 debounce(() => 20), 134 subscribe(val => console.log(val)) 135); 136 137// The five values from sourceA will be omitted 138// After these values and after 20ms `1` will be logged 139``` 140 141## filter 142 143`filter` will remove values from a source by passing them through an iteratee that returns a `bool`. 144 145```typescript 146import { fromArray, filter, subscribe } from 'wonka'; 147 148const isEven = (n) => n % 2 === 0; 149 150pipe( 151 fromArray([1, 2, 3, 4, 5, 6]), 152 filter(isEven), 153 subscribe((val) => console.log(val)) 154); 155 156// Prints 246 to the console. 157``` 158 159## map 160 161`map` will transform values from a source by passing them through an iteratee that returns a new value. 162 163```typescript 164import { fromArray, pipe, map, subscribe } from 'wonka'; 165 166const square = (n) => n * n; 167 168pipe( 169 fromArray([1, 2, 3, 4, 5, 6]), 170 map(square), 171 subscribe((val) => console.log(val)) 172); 173 174// Prints 1 4 9 16 25 36 to the console. 175``` 176 177## merge 178 179`merge` merges an array of sources together into a single source. It subscribes 180to all sources that it's passed and emits all their values on the output source. 181 182```typescript 183import { fromArray, pipe, merge, subscribe } from 'wonka'; 184 185const sourceOne = fromArray([1, 2, 3]); 186const sourceTwo = fromArray([4, 5, 6]); 187 188pipe( 189 merge(sourceOne, sourceTwo), 190 subscribe((val) => console.log(val)) 191); // Prints 1 2 3 4 5 6 to the console. 192``` 193 194## mergeAll 195 196`mergeAll` will merge all sources emitted on an outer source into a single one. 197It's very similar to `merge`, but instead accepts a source of sources as an input. 198 199> _Note:_ This operator is also exported as `flatten` which is just an alias for `mergeAll` 200 201```typescript 202import { pipe, fromArray, mergeAll, subscribe } from 'wonka'; 203 204const sourceOne = fromArray([1, 2, 3]); 205const sourceTwo = fromArray([4, 5, 6]); 206 207pipe( 208 fromArray([sourceOne, sourceTwo]), 209 mergeAll, 210 subscribe((val) => console.log(val)) 211); // Prints 1 2 3 4 5 6 to the console. 212``` 213 214## mergeMap 215 216`mergeMap` allows you to map values of an outer source to an inner source. 217This allows you to create nested sources for each emitted value, which will 218all be merged into a single source, like with `mergeAll`. 219 220Unlike `concatMap` all inner sources will be subscribed to at the same time 221and all their values will be emitted on the output source as they come in. 222 223```typescript 224import { pipe, fromArray, mergeMap, subscribe } from 'wonka'; 225 226pipe( 227 fromArray([1, 2]), 228 mergeMap((x) => fromArray([x - 1, x])), 229 subscribe((val) => console.log(val)) 230); // Prints 0 1 1 2 to the console. 231``` 232 233## onEnd 234 235Run a callback when the `End` signal has been sent to the sink by the source, whether by way of the talkback passing the `End` signal or the source being exhausted of values. 236 237```typescript 238import { fromPromise, pipe, concat, onEnd, subscribe } from 'wonka'; 239 240const promiseOne = new Promise((resolve) => { 241 setTimeout(() => { 242 resolve('ResolveOne'); 243 }, 1000); 244}); 245const promiseTwo = new Promise((resolve) => { 246 setTimeout(() => { 247 resolve('ResolveTwo'); 248 }, 2000); 249}); 250 251const sourceOne = fromPromise(promiseOne); 252const sourceTwo = fromPromise(promiseTwo); 253 254pipe( 255 concat([sourceOne, sourceTwo]), 256 onEnd(() => console.log('onEnd')), 257 subscribe((val) => console.log(val)) 258); 259 260// Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately. 261``` 262 263## onPush 264 265Run a callback on each `Push` signal sent to the sink by the source. 266 267> _Note:_ This operator is also exported as `tap` which is just an alias for `onPush` 268 269```typescript 270import { fromArray, pipe, onPush, subscribe } from 'wonka'; 271 272pipe( 273 fromArray([1, 2, 3, 4, 5, 6]), 274 onPush((val) => console.log(`Push ${val}`)), 275 subscribe((val) => console.log(val)) 276); // Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console. 277``` 278 279## onStart 280 281Run a callback when the `Start` signal is sent to the sink by the source. 282 283```typescript 284import { pipe, onStart, fromPromise, subscribe } from 'wonka'; 285 286const promise = new Promise((resolve) => { 287 setTimeout(() => { 288 resolve('Resolve'); 289 }, 1000); 290}); 291 292pipe( 293 fromPromise(promise), 294 onStart(() => console.log('onStart')), 295 subscribe((val) => console.log(val)) 296); 297 298// Logs onStart to the console, pauses for one second to allow the timeout to finish, 299// then logs "Resolve" to the console. 300``` 301 302## sample 303 304`sample` emits the previously emitted value from an outer source every time 305an inner source (notifier) emits. 306 307In combination with `interval` it can be used to get values from a noisy source 308more regularly. 309 310```typescript 311import { pipe, interval, sample, take, subscribe } from 'wonka'; 312 313pipe( 314 interval(10), 315 sample(interval(100)), 316 take(2), 317 subscribe((x) => console.log(x)) 318); // Prints 10 20 to the console. 319``` 320 321## scan 322 323Accumulate emitted values of a source in a accumulator, similar to JavaScript `reduce`. 324 325```typescript 326import { fromArray, pipe, scan, subscribe } from 'wonka'; 327 328pipe( 329 fromArray([1, 2, 3, 4, 5, 6]), 330 scan((acc, val) => acc + val, 0), 331 subscribe((val) => console.log(val)) 332); 333 334// Prints 1 3 6 10 15 21 to the console. 335``` 336 337## share 338 339`share` ensures that all subscriptions to the underlying source are shared. 340 341By default Wonka's sources are lazy. They only instantiate themselves and begin 342emitting signals when they're being subscribed to, since they're also immutable. 343This means that when a source is used in multiple places, their underlying subscription 344is not shared. Instead, the entire chain of sources and operators will be instantiated 345separately every time. 346 347The `share` operator prevents this by creating an output source that will reuse a single 348subscription to the parent source, which will be unsubscribed from when no sinks are 349listening to it anymore. 350 351This is especially useful if you introduce side-effects to your sources, 352for instance with `onStart`. 353 354```typescript 355import { pipe, never, onStart, share, publish } from 'wonka'; 356 357const source = pipe( 358 never 359 onStart(() => console.log('start')), 360 share 361); 362 363// Without share this would print "start" twice: 364publish(source); 365publish(source); 366``` 367 368## skip 369 370`skip` the specified number of emissions from the source. 371 372```typescript 373import { fromArray, pipe, skip, subscribe } from 'wonka'; 374 375pipe( 376 fromArray([1, 2, 3, 4, 5, 6]), 377 skip(2), 378 subscribe((val) => console.log(val)) 379); 380``` 381 382## skipUntil 383 384Skip emissions from an outer source until an inner source (notifier) emits. 385 386```typescript 387import { interval, pipe, skipUntil, subscribe } from 'wonka'; 388 389const source = interval(100); 390const notifier = interval(500); 391 392pipe( 393 source, 394 skipUntil(notifier), 395 subscribe((val) => console.log(val)) 396); 397 398// Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms. 399// Then logs 4 5 6 7 8 9 10... to the console every 500ms. 400``` 401 402## skipWhile 403 404Skip values emitted from the source while they return `true` for the provided predicate function. 405 406```typescript 407import { fromArray, pipe, skipWhile, subscribe } from 'wonka'; 408 409pipe( 410 fromArray([1, 2, 3, 4, 5, 6]), 411 skipWhile((val) => val < 5), 412 subscribe((val) => console.log(val)) 413); 414 415// Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function. 416``` 417 418## switchMap 419 420`switchMap` allows you to map values of an outer source to an inner source. 421The inner source's values will be emitted on the returned output source. If 422a new inner source is returned, because the outer source emitted a new value 423before the previous inner source completed, the inner source is closed and unsubscribed 424from. 425 426This is similar to `concatMap` but instead of waiting for the last inner source to complete 427before emitting values from the next, `switchMap` just cancels the previous inner source. 428 429```typescript 430import { pipe, interval, switchMap, take, subscribe } from 'wonka'; 431 432pipe( 433 interval(50), 434 // The inner interval is cancelled after its first value every time 435 switchMap((value) => interval(40)), 436 take(3), 437 subscribe((x) => console.log(x)) 438); // Prints 1 2 3 to the console 439``` 440 441## switchAll 442 443`switchAll` will combined sources emitted on an outer source together, subscribing 444to only one source at a time, and cancelling the previous inner source, when it hasn't 445ended while the next inner source is created. 446 447It's very similar to `switchMap`, but instead accepts a source of sources. 448 449```typescript 450import { pipe, interval, map, switchAll, take, subscribe } from 'wonka'; 451 452pipe( 453 interval(50), 454 map(() => interval(40)), 455 switchAll, 456 take(3), 457 subscribe((x) => console.log(x)) 458); // Prints 1 2 3 to the console 459``` 460 461These examples are practically identical to the `switchMap` examples, but note 462that `map` was used instead of using `switchMap` directly. This is because combining 463`map` with a subsequent `switchAll` is the same as using `switchMap`. 464 465## take 466 467`take` only a specified number of emissions from the source before completing. `take` is the opposite of `skip`. 468 469```typescript 470import { fromArray, pipe, take, subscribe } from 'wonka'; 471 472pipe( 473 fromArray([1, 2, 3, 4, 5, 6]), 474 take(3), 475 subscribe((val) => console.log(val)) 476); 477 478// Prints 1 2 3 to the console. 479``` 480 481## takeLast 482 483`takeLast` will take only the last n emissions from the source. 484 485```typescript 486import { fromArray, pipe, takeLast, subscribe } from 'wonka'; 487 488pipe( 489 fromArray([1, 2, 3, 4, 5, 6]), 490 takeLast(3), 491 subscribe((val) => console.log(val)) 492); 493 494// Prints 4 5 6 to the console. 495``` 496 497## takeUntil 498 499Take emissions from an outer source until an inner source (notifier) emits. 500 501```typescript 502import { interval, pipe, takeUntil, subscribe } from 'wonka'; 503 504const source = interval(100); 505const notifier = interval(500); 506 507pipe( 508 source, 509 takeUntil(notifier), 510 subscribe((val) => console.log(val)) 511); 512 513// Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms, 514// prints 3, pauses 100, then completes (notifier emits). 515``` 516 517## takeWhile 518 519Take emissions from the stream while they return `true` for the provided predicate function. If the first emission does not return `true`, no values will be `Push`ed to the sink. 520 521```typescript 522import { pipe, fromArray, takeWhile, subscribe } from 'wonka'; 523 524const source = fromArray([1, 2, 3, 4, 5, 6]); 525 526pipe( 527 source, 528 takeWhile((val) => val < 5), 529 subscribe((val) => console.log(val)) 530); 531 532// Prints 1 2 3 4 to the console. 533``` 534 535## throttle 536 537`throttle` emits values of a source, but after each value it will omit all values for 538the given amount of milliseconds. It enforces a time of silence after each value it 539receives and skips values while the silence is still ongoing. 540 541This is very similar to `debounce` but instead of waiting for leading time before a 542value it waits for trailing time after a value. 543 544```typescript 545import { pipe, interval, throttle, take, subscribe } from 'wonka'; 546 547pipe( 548 interval(10), 549 throttle(() => 50) 550 take(2), 551 subscribe(val => console.log(val)) 552); // Outputs 0 6 to the console. 553```