1---
2title: Operators
3order: 1
4---
5
6Operators in Wonka allow you to transform values from a source before they are sent to a sink. Wonka has the following operators.
7
8## buffer
9
10Buffers emissions from an outer source and emits a buffer array of items every time an
11inner source (notifier) emits.
12
13This operator can be used to group values into a arrays on a source. The emitted values will
14be sent when a notifier fires and will be arrays of all items before the notification event.
15
16In combination with `interval` this can be used to group values in chunks regularly.
17
18```typescript
19import { pipe, interval, buffer, take, subscribe } from 'wonka';
20
21pipe(
22 interval(50),
23 buffer(interval(100)),
24 take(2),
25 subscribe((buffer) => {
26 buffer.forEach((x) => console.log(x));
27 console.log(';');
28 })
29); // Prints 1 2; 2 3 to the console.
30```
31
32## combine
33
34`combine` two sources together to a single source. The emitted value will be a combination of the two sources, with all values from the first source being emitted with the first value of the second source _before_ values of the second source are emitted.
35
36```typescript
37import { fromArray, pipe, combine, subscribe } from 'wonka';
38
39const sourceOne = fromArray([1, 2, 3]);
40const sourceTwo = fromArray([4, 5, 6]);
41
42pipe(
43 combine(sourceOne, sourceTwo),
44 subscribe(([valOne, valTwo]) => {
45 console.log(valOne + valTwo);
46 })
47); // Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console.
48```
49
50## concat
51
52`concat` will combine two sources together, subscribing to the next source after the previous source completes.
53
54```typescript
55import { fromArray, pipe, concat, subscribe } from 'wonka';
56
57const sourceOne = fromArray([1, 2, 3]);
58const sourceTwo = fromArray([6, 5, 4]);
59
60pipe(
61 concat([sourceOne, sourceTwo]),
62 subscribe((val) => console.log(val))
63); // Prints 1 2 3 6 5 4 to the console.
64```
65
66## concatAll
67
68`concatAll` will combine all sources emitted on an outer source together, subscribing to the
69next source after the previous source completes.
70
71It's very similar to `concat`, but instead accepts a source of sources as an input.
72
73```typescript
74import { pipe, fromArray, concatAll, subscribe } from 'wonka';
75
76const sourceOne = fromArray([1, 2, 3]);
77const sourceTwo = fromArray([6, 5, 4]);
78
79pipe(
80 fromArray([sourceOne, sourceTwo]),
81 concatAll,
82 subscribe((val) => console.log(val))
83); // Prints 1 2 3 6 5 4 to the console.
84```
85
86## concatMap
87
88`concatMap` allows you to map values of an outer source to an inner source. The sink will not dispatch the `Pull` signal until the previous value has been emitted. This is in contrast to `mergeMap`, which will dispatch the `Pull` signal for new values even if the previous value has not yet been emitted.
89
90```typescript
91import { fromArray, pipe, concatMap, delay, fromValue, subscribe } from 'wonka';
92
93const source = fromArray([1, 2, 3, 4, 5, 6]);
94
95pipe(
96 source,
97 concatMap((val) => {
98 return pipe(fromValue(val), delay(val * 1000));
99 }),
100 subscribe((val) => console.log(val))
101);
102```
103
104## delay
105
106`delay` delays all emitted values of a source by the given amount of milliseconds.
107
108```typescript
109import { pipe, fromArray, delay, subscribe } from 'wonka';
110
111pipe(
112 fromArray([1, 2]),
113 delay(10)
114 subscribe(val => console.log(val))
115);
116// waits 10ms then prints 1, waits 10ms then prints 2, waits 10ms then ends
117```
118
119## debounce
120
121`debounce` doesn't emit values of a source until no values have been emitted after
122a given amount of milliseconds. Once this threshold of silence has been reached, the
123last value that has been received will be emitted.
124
125```typescript
126import { pipe, interval, take, fromValue, concat, debounce, subscribe } from 'wonka';
127
128const sourceA = pipe(interval(10), take(5));
129const sourceB = fromValue(1);
130
131pipe(
132 concat([sourceA, sourceB])
133 debounce(() => 20),
134 subscribe(val => console.log(val))
135);
136
137// The five values from sourceA will be omitted
138// After these values and after 20ms `1` will be logged
139```
140
141## filter
142
143`filter` will remove values from a source by passing them through an iteratee that returns a `bool`.
144
145```typescript
146import { fromArray, filter, subscribe } from 'wonka';
147
148const isEven = (n) => n % 2 === 0;
149
150pipe(
151 fromArray([1, 2, 3, 4, 5, 6]),
152 filter(isEven),
153 subscribe((val) => console.log(val))
154);
155
156// Prints 246 to the console.
157```
158
159## map
160
161`map` will transform values from a source by passing them through an iteratee that returns a new value.
162
163```typescript
164import { fromArray, pipe, map, subscribe } from 'wonka';
165
166const square = (n) => n * n;
167
168pipe(
169 fromArray([1, 2, 3, 4, 5, 6]),
170 map(square),
171 subscribe((val) => console.log(val))
172);
173
174// Prints 1 4 9 16 25 36 to the console.
175```
176
177## merge
178
179`merge` merges an array of sources together into a single source. It subscribes
180to all sources that it's passed and emits all their values on the output source.
181
182```typescript
183import { fromArray, pipe, merge, subscribe } from 'wonka';
184
185const sourceOne = fromArray([1, 2, 3]);
186const sourceTwo = fromArray([4, 5, 6]);
187
188pipe(
189 merge(sourceOne, sourceTwo),
190 subscribe((val) => console.log(val))
191); // Prints 1 2 3 4 5 6 to the console.
192```
193
194## mergeAll
195
196`mergeAll` will merge all sources emitted on an outer source into a single one.
197It's very similar to `merge`, but instead accepts a source of sources as an input.
198
199> _Note:_ This operator is also exported as `flatten` which is just an alias for `mergeAll`
200
201```typescript
202import { pipe, fromArray, mergeAll, subscribe } from 'wonka';
203
204const sourceOne = fromArray([1, 2, 3]);
205const sourceTwo = fromArray([4, 5, 6]);
206
207pipe(
208 fromArray([sourceOne, sourceTwo]),
209 mergeAll,
210 subscribe((val) => console.log(val))
211); // Prints 1 2 3 4 5 6 to the console.
212```
213
214## mergeMap
215
216`mergeMap` allows you to map values of an outer source to an inner source.
217This allows you to create nested sources for each emitted value, which will
218all be merged into a single source, like with `mergeAll`.
219
220Unlike `concatMap` all inner sources will be subscribed to at the same time
221and all their values will be emitted on the output source as they come in.
222
223```typescript
224import { pipe, fromArray, mergeMap, subscribe } from 'wonka';
225
226pipe(
227 fromArray([1, 2]),
228 mergeMap((x) => fromArray([x - 1, x])),
229 subscribe((val) => console.log(val))
230); // Prints 0 1 1 2 to the console.
231```
232
233## onEnd
234
235Run a callback when the `End` signal has been sent to the sink by the source, whether by way of the talkback passing the `End` signal or the source being exhausted of values.
236
237```typescript
238import { fromPromise, pipe, concat, onEnd, subscribe } from 'wonka';
239
240const promiseOne = new Promise((resolve) => {
241 setTimeout(() => {
242 resolve('ResolveOne');
243 }, 1000);
244});
245const promiseTwo = new Promise((resolve) => {
246 setTimeout(() => {
247 resolve('ResolveTwo');
248 }, 2000);
249});
250
251const sourceOne = fromPromise(promiseOne);
252const sourceTwo = fromPromise(promiseTwo);
253
254pipe(
255 concat([sourceOne, sourceTwo]),
256 onEnd(() => console.log('onEnd')),
257 subscribe((val) => console.log(val))
258);
259
260// Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately.
261```
262
263## onPush
264
265Run a callback on each `Push` signal sent to the sink by the source.
266
267> _Note:_ This operator is also exported as `tap` which is just an alias for `onPush`
268
269```typescript
270import { fromArray, pipe, onPush, subscribe } from 'wonka';
271
272pipe(
273 fromArray([1, 2, 3, 4, 5, 6]),
274 onPush((val) => console.log(`Push ${val}`)),
275 subscribe((val) => console.log(val))
276); // Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console.
277```
278
279## onStart
280
281Run a callback when the `Start` signal is sent to the sink by the source.
282
283```typescript
284import { pipe, onStart, fromPromise, subscribe } from 'wonka';
285
286const promise = new Promise((resolve) => {
287 setTimeout(() => {
288 resolve('Resolve');
289 }, 1000);
290});
291
292pipe(
293 fromPromise(promise),
294 onStart(() => console.log('onStart')),
295 subscribe((val) => console.log(val))
296);
297
298// Logs onStart to the console, pauses for one second to allow the timeout to finish,
299// then logs "Resolve" to the console.
300```
301
302## sample
303
304`sample` emits the previously emitted value from an outer source every time
305an inner source (notifier) emits.
306
307In combination with `interval` it can be used to get values from a noisy source
308more regularly.
309
310```typescript
311import { pipe, interval, sample, take, subscribe } from 'wonka';
312
313pipe(
314 interval(10),
315 sample(interval(100)),
316 take(2),
317 subscribe((x) => console.log(x))
318); // Prints 10 20 to the console.
319```
320
321## scan
322
323Accumulate emitted values of a source in a accumulator, similar to JavaScript `reduce`.
324
325```typescript
326import { fromArray, pipe, scan, subscribe } from 'wonka';
327
328pipe(
329 fromArray([1, 2, 3, 4, 5, 6]),
330 scan((acc, val) => acc + val, 0),
331 subscribe((val) => console.log(val))
332);
333
334// Prints 1 3 6 10 15 21 to the console.
335```
336
337## share
338
339`share` ensures that all subscriptions to the underlying source are shared.
340
341By default Wonka's sources are lazy. They only instantiate themselves and begin
342emitting signals when they're being subscribed to, since they're also immutable.
343This means that when a source is used in multiple places, their underlying subscription
344is not shared. Instead, the entire chain of sources and operators will be instantiated
345separately every time.
346
347The `share` operator prevents this by creating an output source that will reuse a single
348subscription to the parent source, which will be unsubscribed from when no sinks are
349listening to it anymore.
350
351This is especially useful if you introduce side-effects to your sources,
352for instance with `onStart`.
353
354```typescript
355import { pipe, never, onStart, share, publish } from 'wonka';
356
357const source = pipe(
358 never
359 onStart(() => console.log('start')),
360 share
361);
362
363// Without share this would print "start" twice:
364publish(source);
365publish(source);
366```
367
368## skip
369
370`skip` the specified number of emissions from the source.
371
372```typescript
373import { fromArray, pipe, skip, subscribe } from 'wonka';
374
375pipe(
376 fromArray([1, 2, 3, 4, 5, 6]),
377 skip(2),
378 subscribe((val) => console.log(val))
379);
380```
381
382## skipUntil
383
384Skip emissions from an outer source until an inner source (notifier) emits.
385
386```typescript
387import { interval, pipe, skipUntil, subscribe } from 'wonka';
388
389const source = interval(100);
390const notifier = interval(500);
391
392pipe(
393 source,
394 skipUntil(notifier),
395 subscribe((val) => console.log(val))
396);
397
398// Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
399// Then logs 4 5 6 7 8 9 10... to the console every 500ms.
400```
401
402## skipWhile
403
404Skip values emitted from the source while they return `true` for the provided predicate function.
405
406```typescript
407import { fromArray, pipe, skipWhile, subscribe } from 'wonka';
408
409pipe(
410 fromArray([1, 2, 3, 4, 5, 6]),
411 skipWhile((val) => val < 5),
412 subscribe((val) => console.log(val))
413);
414
415// Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function.
416```
417
418## switchMap
419
420`switchMap` allows you to map values of an outer source to an inner source.
421The inner source's values will be emitted on the returned output source. If
422a new inner source is returned, because the outer source emitted a new value
423before the previous inner source completed, the inner source is closed and unsubscribed
424from.
425
426This is similar to `concatMap` but instead of waiting for the last inner source to complete
427before emitting values from the next, `switchMap` just cancels the previous inner source.
428
429```typescript
430import { pipe, interval, switchMap, take, subscribe } from 'wonka';
431
432pipe(
433 interval(50),
434 // The inner interval is cancelled after its first value every time
435 switchMap((value) => interval(40)),
436 take(3),
437 subscribe((x) => console.log(x))
438); // Prints 1 2 3 to the console
439```
440
441## switchAll
442
443`switchAll` will combined sources emitted on an outer source together, subscribing
444to only one source at a time, and cancelling the previous inner source, when it hasn't
445ended while the next inner source is created.
446
447It's very similar to `switchMap`, but instead accepts a source of sources.
448
449```typescript
450import { pipe, interval, map, switchAll, take, subscribe } from 'wonka';
451
452pipe(
453 interval(50),
454 map(() => interval(40)),
455 switchAll,
456 take(3),
457 subscribe((x) => console.log(x))
458); // Prints 1 2 3 to the console
459```
460
461These examples are practically identical to the `switchMap` examples, but note
462that `map` was used instead of using `switchMap` directly. This is because combining
463`map` with a subsequent `switchAll` is the same as using `switchMap`.
464
465## take
466
467`take` only a specified number of emissions from the source before completing. `take` is the opposite of `skip`.
468
469```typescript
470import { fromArray, pipe, take, subscribe } from 'wonka';
471
472pipe(
473 fromArray([1, 2, 3, 4, 5, 6]),
474 take(3),
475 subscribe((val) => console.log(val))
476);
477
478// Prints 1 2 3 to the console.
479```
480
481## takeLast
482
483`takeLast` will take only the last n emissions from the source.
484
485```typescript
486import { fromArray, pipe, takeLast, subscribe } from 'wonka';
487
488pipe(
489 fromArray([1, 2, 3, 4, 5, 6]),
490 takeLast(3),
491 subscribe((val) => console.log(val))
492);
493
494// Prints 4 5 6 to the console.
495```
496
497## takeUntil
498
499Take emissions from an outer source until an inner source (notifier) emits.
500
501```typescript
502import { interval, pipe, takeUntil, subscribe } from 'wonka';
503
504const source = interval(100);
505const notifier = interval(500);
506
507pipe(
508 source,
509 takeUntil(notifier),
510 subscribe((val) => console.log(val))
511);
512
513// Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
514// prints 3, pauses 100, then completes (notifier emits).
515```
516
517## takeWhile
518
519Take emissions from the stream while they return `true` for the provided predicate function. If the first emission does not return `true`, no values will be `Push`ed to the sink.
520
521```typescript
522import { pipe, fromArray, takeWhile, subscribe } from 'wonka';
523
524const source = fromArray([1, 2, 3, 4, 5, 6]);
525
526pipe(
527 source,
528 takeWhile((val) => val < 5),
529 subscribe((val) => console.log(val))
530);
531
532// Prints 1 2 3 4 to the console.
533```
534
535## throttle
536
537`throttle` emits values of a source, but after each value it will omit all values for
538the given amount of milliseconds. It enforces a time of silence after each value it
539receives and skips values while the silence is still ongoing.
540
541This is very similar to `debounce` but instead of waiting for leading time before a
542value it waits for trailing time after a value.
543
544```typescript
545import { pipe, interval, throttle, take, subscribe } from 'wonka';
546
547pipe(
548 interval(10),
549 throttle(() => 50)
550 take(2),
551 subscribe(val => console.log(val))
552); // Outputs 0 6 to the console.
553```