title: Operators order: 1#
Operators in Wonka allow you to transform values from a source before they are sent to a sink. Wonka has the following operators.
buffer#
Buffers emissions from an outer source and emits a buffer array of items every time an inner source (notifier) emits.
This operator can be used to group values into a arrays on a source. The emitted values will be sent when a notifier fires and will be arrays of all items before the notification event.
In combination with interval this can be used to group values in chunks regularly.
import { pipe, interval, buffer, take, subscribe } from 'wonka';
pipe(
interval(50),
buffer(interval(100)),
take(2),
subscribe((buffer) => {
buffer.forEach((x) => console.log(x));
console.log(';');
})
); // Prints 1 2; 2 3 to the console.
combine#
combine two sources together to a single source. The emitted value will be a combination of the two sources, with all values from the first source being emitted with the first value of the second source before values of the second source are emitted.
import { fromArray, pipe, combine, subscribe } from 'wonka';
const sourceOne = fromArray([1, 2, 3]);
const sourceTwo = fromArray([4, 5, 6]);
pipe(
combine(sourceOne, sourceTwo),
subscribe(([valOne, valTwo]) => {
console.log(valOne + valTwo);
})
); // Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console.
concat#
concat will combine two sources together, subscribing to the next source after the previous source completes.
import { fromArray, pipe, concat, subscribe } from 'wonka';
const sourceOne = fromArray([1, 2, 3]);
const sourceTwo = fromArray([6, 5, 4]);
pipe(
concat([sourceOne, sourceTwo]),
subscribe((val) => console.log(val))
); // Prints 1 2 3 6 5 4 to the console.
concatAll#
concatAll will combine all sources emitted on an outer source together, subscribing to the
next source after the previous source completes.
It's very similar to concat, but instead accepts a source of sources as an input.
import { pipe, fromArray, concatAll, subscribe } from 'wonka';
const sourceOne = fromArray([1, 2, 3]);
const sourceTwo = fromArray([6, 5, 4]);
pipe(
fromArray([sourceOne, sourceTwo]),
concatAll,
subscribe((val) => console.log(val))
); // Prints 1 2 3 6 5 4 to the console.
concatMap#
concatMap allows you to map values of an outer source to an inner source. The sink will not dispatch the Pull signal until the previous value has been emitted. This is in contrast to mergeMap, which will dispatch the Pull signal for new values even if the previous value has not yet been emitted.
import { fromArray, pipe, concatMap, delay, fromValue, subscribe } from 'wonka';
const source = fromArray([1, 2, 3, 4, 5, 6]);
pipe(
source,
concatMap((val) => {
return pipe(fromValue(val), delay(val * 1000));
}),
subscribe((val) => console.log(val))
);
delay#
delay delays all emitted values of a source by the given amount of milliseconds.
import { pipe, fromArray, delay, subscribe } from 'wonka';
pipe(
fromArray([1, 2]),
delay(10)
subscribe(val => console.log(val))
);
// waits 10ms then prints 1, waits 10ms then prints 2, waits 10ms then ends
debounce#
debounce doesn't emit values of a source until no values have been emitted after
a given amount of milliseconds. Once this threshold of silence has been reached, the
last value that has been received will be emitted.
import { pipe, interval, take, fromValue, concat, debounce, subscribe } from 'wonka';
const sourceA = pipe(interval(10), take(5));
const sourceB = fromValue(1);
pipe(
concat([sourceA, sourceB])
debounce(() => 20),
subscribe(val => console.log(val))
);
// The five values from sourceA will be omitted
// After these values and after 20ms `1` will be logged
filter#
filter will remove values from a source by passing them through an iteratee that returns a bool.
import { fromArray, filter, subscribe } from 'wonka';
const isEven = (n) => n % 2 === 0;
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
filter(isEven),
subscribe((val) => console.log(val))
);
// Prints 246 to the console.
map#
map will transform values from a source by passing them through an iteratee that returns a new value.
import { fromArray, pipe, map, subscribe } from 'wonka';
const square = (n) => n * n;
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
map(square),
subscribe((val) => console.log(val))
);
// Prints 1 4 9 16 25 36 to the console.
merge#
merge merges an array of sources together into a single source. It subscribes
to all sources that it's passed and emits all their values on the output source.
import { fromArray, pipe, merge, subscribe } from 'wonka';
const sourceOne = fromArray([1, 2, 3]);
const sourceTwo = fromArray([4, 5, 6]);
pipe(
merge(sourceOne, sourceTwo),
subscribe((val) => console.log(val))
); // Prints 1 2 3 4 5 6 to the console.
mergeAll#
mergeAll will merge all sources emitted on an outer source into a single one.
It's very similar to merge, but instead accepts a source of sources as an input.
Note: This operator is also exported as
flattenwhich is just an alias formergeAll
import { pipe, fromArray, mergeAll, subscribe } from 'wonka';
const sourceOne = fromArray([1, 2, 3]);
const sourceTwo = fromArray([4, 5, 6]);
pipe(
fromArray([sourceOne, sourceTwo]),
mergeAll,
subscribe((val) => console.log(val))
); // Prints 1 2 3 4 5 6 to the console.
mergeMap#
mergeMap allows you to map values of an outer source to an inner source.
This allows you to create nested sources for each emitted value, which will
all be merged into a single source, like with mergeAll.
Unlike concatMap all inner sources will be subscribed to at the same time
and all their values will be emitted on the output source as they come in.
import { pipe, fromArray, mergeMap, subscribe } from 'wonka';
pipe(
fromArray([1, 2]),
mergeMap((x) => fromArray([x - 1, x])),
subscribe((val) => console.log(val))
); // Prints 0 1 1 2 to the console.
onEnd#
Run a callback when the End signal has been sent to the sink by the source, whether by way of the talkback passing the End signal or the source being exhausted of values.
import { fromPromise, pipe, concat, onEnd, subscribe } from 'wonka';
const promiseOne = new Promise((resolve) => {
setTimeout(() => {
resolve('ResolveOne');
}, 1000);
});
const promiseTwo = new Promise((resolve) => {
setTimeout(() => {
resolve('ResolveTwo');
}, 2000);
});
const sourceOne = fromPromise(promiseOne);
const sourceTwo = fromPromise(promiseTwo);
pipe(
concat([sourceOne, sourceTwo]),
onEnd(() => console.log('onEnd')),
subscribe((val) => console.log(val))
);
// Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately.
onPush#
Run a callback on each Push signal sent to the sink by the source.
Note: This operator is also exported as
tapwhich is just an alias foronPush
import { fromArray, pipe, onPush, subscribe } from 'wonka';
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
onPush((val) => console.log(`Push ${val}`)),
subscribe((val) => console.log(val))
); // Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console.
onStart#
Run a callback when the Start signal is sent to the sink by the source.
import { pipe, onStart, fromPromise, subscribe } from 'wonka';
const promise = new Promise((resolve) => {
setTimeout(() => {
resolve('Resolve');
}, 1000);
});
pipe(
fromPromise(promise),
onStart(() => console.log('onStart')),
subscribe((val) => console.log(val))
);
// Logs onStart to the console, pauses for one second to allow the timeout to finish,
// then logs "Resolve" to the console.
sample#
sample emits the previously emitted value from an outer source every time
an inner source (notifier) emits.
In combination with interval it can be used to get values from a noisy source
more regularly.
import { pipe, interval, sample, take, subscribe } from 'wonka';
pipe(
interval(10),
sample(interval(100)),
take(2),
subscribe((x) => console.log(x))
); // Prints 10 20 to the console.
scan#
Accumulate emitted values of a source in a accumulator, similar to JavaScript reduce.
import { fromArray, pipe, scan, subscribe } from 'wonka';
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
scan((acc, val) => acc + val, 0),
subscribe((val) => console.log(val))
);
// Prints 1 3 6 10 15 21 to the console.
share#
share ensures that all subscriptions to the underlying source are shared.
By default Wonka's sources are lazy. They only instantiate themselves and begin emitting signals when they're being subscribed to, since they're also immutable. This means that when a source is used in multiple places, their underlying subscription is not shared. Instead, the entire chain of sources and operators will be instantiated separately every time.
The share operator prevents this by creating an output source that will reuse a single
subscription to the parent source, which will be unsubscribed from when no sinks are
listening to it anymore.
This is especially useful if you introduce side-effects to your sources,
for instance with onStart.
import { pipe, never, onStart, share, publish } from 'wonka';
const source = pipe(
never
onStart(() => console.log('start')),
share
);
// Without share this would print "start" twice:
publish(source);
publish(source);
skip#
skip the specified number of emissions from the source.
import { fromArray, pipe, skip, subscribe } from 'wonka';
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
skip(2),
subscribe((val) => console.log(val))
);
skipUntil#
Skip emissions from an outer source until an inner source (notifier) emits.
import { interval, pipe, skipUntil, subscribe } from 'wonka';
const source = interval(100);
const notifier = interval(500);
pipe(
source,
skipUntil(notifier),
subscribe((val) => console.log(val))
);
// Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
// Then logs 4 5 6 7 8 9 10... to the console every 500ms.
skipWhile#
Skip values emitted from the source while they return true for the provided predicate function.
import { fromArray, pipe, skipWhile, subscribe } from 'wonka';
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
skipWhile((val) => val < 5),
subscribe((val) => console.log(val))
);
// Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function.
switchMap#
switchMap allows you to map values of an outer source to an inner source.
The inner source's values will be emitted on the returned output source. If
a new inner source is returned, because the outer source emitted a new value
before the previous inner source completed, the inner source is closed and unsubscribed
from.
This is similar to concatMap but instead of waiting for the last inner source to complete
before emitting values from the next, switchMap just cancels the previous inner source.
import { pipe, interval, switchMap, take, subscribe } from 'wonka';
pipe(
interval(50),
// The inner interval is cancelled after its first value every time
switchMap((value) => interval(40)),
take(3),
subscribe((x) => console.log(x))
); // Prints 1 2 3 to the console
switchAll#
switchAll will combined sources emitted on an outer source together, subscribing
to only one source at a time, and cancelling the previous inner source, when it hasn't
ended while the next inner source is created.
It's very similar to switchMap, but instead accepts a source of sources.
import { pipe, interval, map, switchAll, take, subscribe } from 'wonka';
pipe(
interval(50),
map(() => interval(40)),
switchAll,
take(3),
subscribe((x) => console.log(x))
); // Prints 1 2 3 to the console
These examples are practically identical to the switchMap examples, but note
that map was used instead of using switchMap directly. This is because combining
map with a subsequent switchAll is the same as using switchMap.
take#
take only a specified number of emissions from the source before completing. take is the opposite of skip.
import { fromArray, pipe, take, subscribe } from 'wonka';
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
take(3),
subscribe((val) => console.log(val))
);
// Prints 1 2 3 to the console.
takeLast#
takeLast will take only the last n emissions from the source.
import { fromArray, pipe, takeLast, subscribe } from 'wonka';
pipe(
fromArray([1, 2, 3, 4, 5, 6]),
takeLast(3),
subscribe((val) => console.log(val))
);
// Prints 4 5 6 to the console.
takeUntil#
Take emissions from an outer source until an inner source (notifier) emits.
import { interval, pipe, takeUntil, subscribe } from 'wonka';
const source = interval(100);
const notifier = interval(500);
pipe(
source,
takeUntil(notifier),
subscribe((val) => console.log(val))
);
// Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
// prints 3, pauses 100, then completes (notifier emits).
takeWhile#
Take emissions from the stream while they return true for the provided predicate function. If the first emission does not return true, no values will be Pushed to the sink.
import { pipe, fromArray, takeWhile, subscribe } from 'wonka';
const source = fromArray([1, 2, 3, 4, 5, 6]);
pipe(
source,
takeWhile((val) => val < 5),
subscribe((val) => console.log(val))
);
// Prints 1 2 3 4 to the console.
throttle#
throttle emits values of a source, but after each value it will omit all values for
the given amount of milliseconds. It enforces a time of silence after each value it
receives and skips values while the silence is still ongoing.
This is very similar to debounce but instead of waiting for leading time before a
value it waits for trailing time after a value.
import { pipe, interval, throttle, take, subscribe } from 'wonka';
pipe(
interval(10),
throttle(() => 50)
take(2),
subscribe(val => console.log(val))
); // Outputs 0 6 to the console.