···
In combination with `interval` this can be used to group values in chunks regularly.
-
|> Wonka.buffer(Wonka.interval(100))
-
|> Wonka.subscribe((. buffer) => {
-
Js.Array.forEach(num => print_int(num), buffer);
-
/* Prints 1 2; 2 3 to the console. */
import { pipe, interval, buffer, take, subscribe } from 'wonka';
-
buffer.forEach(x => console.log(x));
); // Prints 1 2; 2 3 to the console.
···
`combine` two sources together to a single source. The emitted value will be a combination of the two sources, with all values from the first source being emitted with the first value of the second source _before_ values of the second source are emitted.
-
let sourceOne = Wonka.fromArray([|1, 2, 3|]);
-
let sourceTwo = Wonka.fromArray([|4, 5, 6|]);
-
Wonka.combine(sourceOne)
-
|> Wonka.subscribe((. (a, b)) => print_int(a + b));
-
/* Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console. */
import { fromArray, pipe, combine, subscribe } from 'wonka';
···
`concat` will combine two sources together, subscribing to the next source after the previous source completes.
-
let sourceOne = Wonka.fromArray([|1, 2, 3|]);
-
let sourceTwo = Wonka.fromArray([|6, 5, 4|]);
-
Wonka.concat([|sourceOne, sourceTwo|])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 6 5 4 to the console. */
import { fromArray, pipe, concat, subscribe } from 'wonka';
···
concat([sourceOne, sourceTwo]),
-
subscribe(val => console.log(val))
); // Prints 1 2 3 6 5 4 to the console.
···
It's very similar to `concat`, but instead accepts a source of sources as an input.
-
let sourceOne = Wonka.fromArray([|1, 2, 3|]);
-
let sourceTwo = Wonka.fromArray([|6, 5, 4|]);
-
Wonka.fromList([sourceOne, sourceTwo])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 6 5 4 to the console. */
import { pipe, fromArray, concatAll, subscribe } from 'wonka';
···
fromArray([sourceOne, sourceTwo]),
-
subscribe(val => console.log(val))
); // Prints 1 2 3 6 5 4 to the console.
···
`concatMap` allows you to map values of an outer source to an inner source. The sink will not dispatch the `Pull` signal until the previous value has been emitted. This is in contrast to `mergeMap`, which will dispatch the `Pull` signal for new values even if the previous value has not yet been emitted.
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
-
|> Wonka.concatMap((. _val) =>
-
Wonka.delay(_val * 1000, Wonka.fromValue(_val))
-
|> Wonka.subscribe((. _val) => print_int(_val));
-
/* After 1s, 1 will be emitted. After an additional 2s, 2 will be emitted.
-
After an additional 3s, 3 will be emitted. After an additional 4s, 4 will be emitted.
-
After an additional 5s, 5 will be emitted. After an additional 6s, 6 will be emitted. */
import { fromArray, pipe, concatMap, delay, fromValue, subscribe } from 'wonka';
···
-
subscribe(val => console.log(val))
`delay` delays all emitted values of a source by the given amount of milliseconds.
-
> _Note:_ This operator is only available in JavaScript environments, and will be excluded
-
> when compiling natively.
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* waits 10ms then prints 1, waits 10ms then prints 2, waits 10ms then ends */
import { pipe, fromArray, delay, subscribe } from 'wonka';
···
a given amount of milliseconds. Once this threshold of silence has been reached, the
last value that has been received will be emitted.
-
> _Note:_ This operator is only available in JavaScript environments, and will be excluded
-
> when compiling natively.
-
let sourceA = Wonka.interval(10)
-
let sourceB = Wonka.fromValue(1);
-
Wonka.concat([|sourceA, sourceB|])
-
|> Wonka.debounce((. _x) => 20)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* The five values from sourceA will be omitted */
-
/* After these values and after 20ms `1` will be logged */
import { pipe, interval, take, fromValue, concat, debounce, subscribe } from 'wonka';
···
`filter` will remove values from a source by passing them through an iteratee that returns a `bool`.
-
let isEven = (. n) => n mod 2 === 0;
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
-
|> Wonka.filter(isEven)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 246 to the console. */
import { fromArray, filter, subscribe } from 'wonka';
-
const isEven = n => n % 2 === 0;
fromArray([1, 2, 3, 4, 5, 6]),
-
subscribe(val => console.log(val))
// Prints 246 to the console.
···
`map` will transform values from a source by passing them through an iteratee that returns a new value.
-
let square = (. n) => n * n;
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 4 9 16 25 36 to the console. */
import { fromArray, pipe, map, subscribe } from 'wonka';
-
const square = n => n * n;
fromArray([1, 2, 3, 4, 5, 6]),
-
subscribe(val => console.log(val))
// Prints 1 4 9 16 25 36 to the console.
···
`merge` merges an array of sources together into a single source. It subscribes
to all sources that it's passed and emits all their values on the output source.
-
let sourceA = Wonka.fromArray([|1, 2, 3|]);
-
let sourceB = Wonka.fromArray([|4, 5, 6|]);
-
Wonka.merge([|sourceA, sourceB|])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 4 5 6 to the console. */
import { fromArray, pipe, merge, subscribe } from 'wonka';
···
> _Note:_ This operator is also exported as `flatten` which is just an alias for `mergeAll`
-
let sourceA = Wonka.fromArray([|1, 2, 3|]);
-
let sourceB = Wonka.fromArray([|4, 5, 6|]);
-
Wonka.fromList([sourceA, sourceB])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 4 5 6 to the console. */
import { pipe, fromArray, mergeAll, subscribe } from 'wonka';
···
fromArray([sourceOne, sourceTwo]),
-
subscribe(val => console.log(val))
); // Prints 1 2 3 4 5 6 to the console.
···
Unlike `concatMap` all inner sources will be subscribed to at the same time
and all their values will be emitted on the output source as they come in.
-
|> Wonka.mergeMap((. value) =>
-
Wonka.fromList([value - 1, value]))
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 0 1 1 2 to the console. */
import { pipe, fromArray, mergeMap, subscribe } from 'wonka';
-
mergeMap(x => fromArray([x - 1, x])),
-
subscribe(val => console.log(val))
); // Prints 0 1 1 2 to the console.
···
Run a callback when the `End` signal has been sent to the sink by the source, whether by way of the talkback passing the `End` signal or the source being exhausted of values.
-
Js.Promise.make((~resolve, ~reject as _) =>
-
Js.Global.setTimeout(() => resolve(. "ResolveOne"), 1000) |> ignore
-
Js.Promise.make((~resolve, ~reject as _) =>
-
Js.Global.setTimeout(() => resolve(. "ResolveTwo"), 2000) |> ignore
-
let sourceOne = Wonka.fromPromise(promiseOne);
-
let sourceTwo = Wonka.fromPromise(promiseTwo);
-
Wonka.concat([|sourceOne, sourceTwo|])
-
|> Wonka.onEnd((.) => print_endline("onEnd"))
-
|> Wonka.subscribe((. x) => print_endline(x));
-
/* Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately. */
import { fromPromise, pipe, concat, onEnd, subscribe } from 'wonka';
-
const promiseOne = new Promise(resolve => {
-
const promiseTwo = new Promise(resolve => {
···
concat([sourceOne, sourceTwo]),
onEnd(() => console.log('onEnd')),
-
subscribe(val => console.log(val))
// Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately.
···
> _Note:_ This operator is also exported as `tap` which is just an alias for `onPush`
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
-
|> Wonka.onPush((. x) => print_string({j|Push $x|j}))
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console. */
import { fromArray, pipe, onPush, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
-
onPush(val => console.log(`Push ${val}`)),
-
subscribe(val => console.log(val))
); // Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console.
···
Run a callback when the `Start` signal is sent to the sink by the source.
-
Js.Promise.make((~resolve, ~reject as _) =>
-
Js.Global.setTimeout(() => resolve(. "Resolve"), 1000) |> ignore
-
Wonka.fromPromise(promise)
-
|> Wonka.onStart((.) => print_endline("onStart"))
-
|> Wonka.subscribe((. _val) => print_endline(_val));
-
/* Logs onStart to the console, pauses for one second to allow the timeout to finish,
-
then logs "Resolve" to the console. */
import { pipe, onStart, fromPromise, subscribe } from 'wonka';
-
const promise = new Promise(resolve => {
···
onStart(() => console.log('onStart')),
-
subscribe(val => console.log(val))
// Logs onStart to the console, pauses for one second to allow the timeout to finish,
···
In combination with `interval` it can be used to get values from a noisy source
-
|> Wonka.sample(Wonka.interval(100))
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 10 20 to the console. */
import { pipe, interval, sample, take, subscribe } from 'wonka';
-
subscribe(x => console.log(x))
); // Prints 10 20 to the console.
···
Accumulate emitted values of a source in a accumulator, similar to JavaScript `reduce`.
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
-
|> Wonka.scan((. acc, x) => acc + x, 0)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 3 6 10 15 21 to the console. */
import { fromArray, pipe, scan, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
scan((acc, val) => acc + val, 0),
-
subscribe(val => console.log(val))
// Prints 1 3 6 10 15 21 to the console.
···
This is especially useful if you introduce side-effects to your sources,
for instance with `onStart`.
-
let source = Wonka.never
-
|> Wonka.onStart((.) => print_endline("start"))
-
/* Without share this would print "start" twice: */
import { pipe, never, onStart, share, publish } from 'wonka';
···
`skip` the specified number of emissions from the source.
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 3 4 5 6 to the console, since the first two emissions from the source were skipped.
import { fromArray, pipe, skip, subscribe } from 'wonka';
···
fromArray([1, 2, 3, 4, 5, 6]),
-
subscribe(val => console.log(val))
···
Skip emissions from an outer source until an inner source (notifier) emits.
-
let source = Wonka.interval(100);
-
let notifier = Wonka.interval(500);
-
|> Wonka.skipUntil(notifier)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
-
Then logs 4 5 6 7 8 9 10... to the console every 500ms. */
import { interval, pipe, skipUntil, subscribe } from 'wonka';
···
-
subscribe(val => console.log(val))
// Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
···
Skip values emitted from the source while they return `true` for the provided predicate function.
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
-
|> Wonka.skipWhile((. _val) => _val < 5)
-
|> Wonka.subscribe((. _val) => print_int(_val));
-
/* Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function. */
import { fromArray, pipe, skipWhile, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
-
skipWhile(val => val < 5),
-
subscribe(val => console.log(val))
// Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function.
···
This is similar to `concatMap` but instead of waiting for the last inner source to complete
before emitting values from the next, `switchMap` just cancels the previous inner source.
-
|> Wonka.switchMap((. _value) =>
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 to the console. */
-
/* The inner interval is cancelled after its first value every time */
import { pipe, interval, switchMap, take, subscribe } from 'wonka';
// The inner interval is cancelled after its first value every time
-
switchMap(value => interval(40)),
-
subscribe(x => console.log(x))
); // Prints 1 2 3 to the console
···
It's very similar to `switchMap`, but instead accepts a source of sources.
-
|> Wonka.map((. _value) =>
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 to the console. */
import { pipe, interval, map, switchAll, take, subscribe } from 'wonka';
···
-
subscribe(x => console.log(x))
); // Prints 1 2 3 to the console
···
`take` only a specified number of emissions from the source before completing. `take` is the opposite of `skip`.
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 to the console. */
import { fromArray, pipe, take, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
-
subscribe(val => console.log(val))
// Prints 1 2 3 to the console.
···
`takeLast` will take only the last n emissions from the source.
-
Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 4 5 6 to the console. */
import { fromArray, pipe, takeLast, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
-
subscribe(val => console.log(val))
// Prints 4 5 6 to the console.
···
Take emissions from an outer source until an inner source (notifier) emits.
-
let source = Wonka.interval(100);
-
let notifier = Wonka.interval(500);
-
|> Wonka.takeUntil(notifier)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
-
prints 3, pauses 100, then completes (notifier emits). */
import { interval, pipe, takeUntil, subscribe } from 'wonka';
···
-
subscribe(val => console.log(val))
// Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
···
Take emissions from the stream while they return `true` for the provided predicate function. If the first emission does not return `true`, no values will be `Push`ed to the sink.
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
-
|> Wonka.takeWhile((. x) => x < 5)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Prints 1 2 3 4 to the console. */
import { pipe, fromArray, takeWhile, subscribe } from 'wonka';
···
-
takeWhile(val => val < 5),
-
subscribe(val => console.log(val))
// Prints 1 2 3 4 to the console.
···
This is very similar to `debounce` but instead of waiting for leading time before a
value it waits for trailing time after a value.
-
> _Note:_ This operator is only available in JavaScript environments, and will be excluded
-
> when compiling natively.
-
|> Wonka.throttle((. _x) => 50)
-
|> Wonka.subscribe((. x) => print_int(x));
-
/* Outputs 0 6 to the console. */
import { pipe, interval, throttle, take, subscribe } from 'wonka';
···
In combination with `interval` this can be used to group values in chunks regularly.
import { pipe, interval, buffer, take, subscribe } from 'wonka';
+
subscribe((buffer) => {
+
buffer.forEach((x) => console.log(x));
); // Prints 1 2; 2 3 to the console.
···
`combine` two sources together to a single source. The emitted value will be a combination of the two sources, with all values from the first source being emitted with the first value of the second source _before_ values of the second source are emitted.
import { fromArray, pipe, combine, subscribe } from 'wonka';
···
`concat` will combine two sources together, subscribing to the next source after the previous source completes.
import { fromArray, pipe, concat, subscribe } from 'wonka';
···
concat([sourceOne, sourceTwo]),
+
subscribe((val) => console.log(val))
); // Prints 1 2 3 6 5 4 to the console.
···
It's very similar to `concat`, but instead accepts a source of sources as an input.
import { pipe, fromArray, concatAll, subscribe } from 'wonka';
···
fromArray([sourceOne, sourceTwo]),
+
subscribe((val) => console.log(val))
); // Prints 1 2 3 6 5 4 to the console.
···
`concatMap` allows you to map values of an outer source to an inner source. The sink will not dispatch the `Pull` signal until the previous value has been emitted. This is in contrast to `mergeMap`, which will dispatch the `Pull` signal for new values even if the previous value has not yet been emitted.
import { fromArray, pipe, concatMap, delay, fromValue, subscribe } from 'wonka';
···
+
return pipe(fromValue(val), delay(val * 1000));
+
subscribe((val) => console.log(val))
`delay` delays all emitted values of a source by the given amount of milliseconds.
import { pipe, fromArray, delay, subscribe } from 'wonka';
···
a given amount of milliseconds. Once this threshold of silence has been reached, the
last value that has been received will be emitted.
import { pipe, interval, take, fromValue, concat, debounce, subscribe } from 'wonka';
···
`filter` will remove values from a source by passing them through an iteratee that returns a `bool`.
import { fromArray, filter, subscribe } from 'wonka';
+
const isEven = (n) => n % 2 === 0;
fromArray([1, 2, 3, 4, 5, 6]),
+
subscribe((val) => console.log(val))
// Prints 246 to the console.
···
`map` will transform values from a source by passing them through an iteratee that returns a new value.
import { fromArray, pipe, map, subscribe } from 'wonka';
+
const square = (n) => n * n;
fromArray([1, 2, 3, 4, 5, 6]),
+
subscribe((val) => console.log(val))
// Prints 1 4 9 16 25 36 to the console.
···
`merge` merges an array of sources together into a single source. It subscribes
to all sources that it's passed and emits all their values on the output source.
import { fromArray, pipe, merge, subscribe } from 'wonka';
···
> _Note:_ This operator is also exported as `flatten` which is just an alias for `mergeAll`
import { pipe, fromArray, mergeAll, subscribe } from 'wonka';
···
fromArray([sourceOne, sourceTwo]),
+
subscribe((val) => console.log(val))
); // Prints 1 2 3 4 5 6 to the console.
···
Unlike `concatMap` all inner sources will be subscribed to at the same time
and all their values will be emitted on the output source as they come in.
import { pipe, fromArray, mergeMap, subscribe } from 'wonka';
+
mergeMap((x) => fromArray([x - 1, x])),
+
subscribe((val) => console.log(val))
); // Prints 0 1 1 2 to the console.
···
Run a callback when the `End` signal has been sent to the sink by the source, whether by way of the talkback passing the `End` signal or the source being exhausted of values.
import { fromPromise, pipe, concat, onEnd, subscribe } from 'wonka';
+
const promiseOne = new Promise((resolve) => {
+
const promiseTwo = new Promise((resolve) => {
···
concat([sourceOne, sourceTwo]),
onEnd(() => console.log('onEnd')),
+
subscribe((val) => console.log(val))
// Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately.
···
> _Note:_ This operator is also exported as `tap` which is just an alias for `onPush`
import { fromArray, pipe, onPush, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
+
onPush((val) => console.log(`Push ${val}`)),
+
subscribe((val) => console.log(val))
); // Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console.
···
Run a callback when the `Start` signal is sent to the sink by the source.
import { pipe, onStart, fromPromise, subscribe } from 'wonka';
+
const promise = new Promise((resolve) => {
···
onStart(() => console.log('onStart')),
+
subscribe((val) => console.log(val))
// Logs onStart to the console, pauses for one second to allow the timeout to finish,
···
In combination with `interval` it can be used to get values from a noisy source
import { pipe, interval, sample, take, subscribe } from 'wonka';
+
subscribe((x) => console.log(x))
); // Prints 10 20 to the console.
···
Accumulate emitted values of a source in a accumulator, similar to JavaScript `reduce`.
import { fromArray, pipe, scan, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
scan((acc, val) => acc + val, 0),
+
subscribe((val) => console.log(val))
// Prints 1 3 6 10 15 21 to the console.
···
This is especially useful if you introduce side-effects to your sources,
for instance with `onStart`.
import { pipe, never, onStart, share, publish } from 'wonka';
···
`skip` the specified number of emissions from the source.
import { fromArray, pipe, skip, subscribe } from 'wonka';
···
fromArray([1, 2, 3, 4, 5, 6]),
+
subscribe((val) => console.log(val))
···
Skip emissions from an outer source until an inner source (notifier) emits.
import { interval, pipe, skipUntil, subscribe } from 'wonka';
···
+
subscribe((val) => console.log(val))
// Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
···
Skip values emitted from the source while they return `true` for the provided predicate function.
import { fromArray, pipe, skipWhile, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
+
skipWhile((val) => val < 5),
+
subscribe((val) => console.log(val))
// Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function.
···
This is similar to `concatMap` but instead of waiting for the last inner source to complete
before emitting values from the next, `switchMap` just cancels the previous inner source.
import { pipe, interval, switchMap, take, subscribe } from 'wonka';
// The inner interval is cancelled after its first value every time
+
switchMap((value) => interval(40)),
+
subscribe((x) => console.log(x))
); // Prints 1 2 3 to the console
···
It's very similar to `switchMap`, but instead accepts a source of sources.
import { pipe, interval, map, switchAll, take, subscribe } from 'wonka';
···
+
subscribe((x) => console.log(x))
); // Prints 1 2 3 to the console
···
`take` only a specified number of emissions from the source before completing. `take` is the opposite of `skip`.
import { fromArray, pipe, take, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
+
subscribe((val) => console.log(val))
// Prints 1 2 3 to the console.
···
`takeLast` will take only the last n emissions from the source.
import { fromArray, pipe, takeLast, subscribe } from 'wonka';
fromArray([1, 2, 3, 4, 5, 6]),
+
subscribe((val) => console.log(val))
// Prints 4 5 6 to the console.
···
Take emissions from an outer source until an inner source (notifier) emits.
import { interval, pipe, takeUntil, subscribe } from 'wonka';
···
+
subscribe((val) => console.log(val))
// Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
···
Take emissions from the stream while they return `true` for the provided predicate function. If the first emission does not return `true`, no values will be `Push`ed to the sink.
import { pipe, fromArray, takeWhile, subscribe } from 'wonka';
···
+
takeWhile((val) => val < 5),
+
subscribe((val) => console.log(val))
// Prints 1 2 3 4 to the console.
···
This is very similar to `debounce` but instead of waiting for leading time before a
value it waits for trailing time after a value.
import { pipe, interval, throttle, take, subscribe } from 'wonka';