···
Operators in Wonka allow you to transform values from a source before they are sent to a sink. Wonka has the following operators.
10
+
Buffers emissions from an outer source and emits a buffer array of items every time an
11
+
inner source (notifier) emits.
13
+
This operator can be used to group values into a arrays on a source. The emitted values will
14
+
be sent when a notifier fires and will be arrays of all items before the notification event.
16
+
In combination with `interval` this can be used to group values in chunks regularly.
20
+
|> Wonka.buffer(Wonka.interval(100))
22
+
|> Wonka.subscribe((. buffer) => {
23
+
Js.Array.forEach(num => print_int(num), buffer);
26
+
/* Prints 1 2; 2 3 to the console. */
30
+
import { pipe, interval, buffer, take, subscribe } from 'wonka';
34
+
buffer(interval(100)),
36
+
subscribe(buffer => {
37
+
buffer.forEach(x => console.log(x));
40
+
); // Prints 1 2; 2 3 to the console.
`combine` two sources together to a single source. The emitted value will be a combination of the two sources, with all values from the first source being emitted with the first value of the second source _before_ values of the second source are emitted.
···
let sourceTwo = Wonka.fromArray([|4, 5, 6|]);
Wonka.combine(sourceOne, sourceTwo)
17
-
|> Wonka.subscribe((. (_valOne, _valTwo)) => print_int(_valOne + _valTwo));
52
+
|> Wonka.subscribe((. (a, b)) => print_int(a + b));
/* Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console. */
···
subscribe(([valOne, valTwo]) => {
console.log(valOne + valTwo);
35
-
// Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console.
68
+
); // Prints 56789 (1+4, 2+4, 3+4, 3+5, 3+6) to the console.
···
`concat` will combine two sources together, subscribing to the next source after the previous source completes.
43
-
let sourceOne = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
44
-
let sourceTwo = Wonka.fromArray([|6, 5, 4, 3, 2, 1|]);
46
-
Wonka.concat([|sourceOne, sourceTwo|]) |> Wonka.subscribe((. _val) => print_int(_val));
76
+
let sourceOne = Wonka.fromArray([|1, 2, 3|]);
77
+
let sourceTwo = Wonka.fromArray([|6, 5, 4|]);
48
-
/* Prints 1 2 3 4 5 6 6 5 4 3 2 1 to the console. */
79
+
Wonka.concat([|sourceOne, sourceTwo|])
80
+
|> Wonka.subscribe((. x) => print_int(x));
81
+
/* Prints 1 2 3 6 5 4 to the console. */
import { fromArray, pipe, concat, subscribe } from 'wonka';
54
-
const sourceOne = fromArray([1, 2, 3, 4, 5, 6]);
55
-
const sourceTwo = fromArray([6, 5, 4, 3, 2, 1]);
87
+
const sourceOne = fromArray([1, 2, 3]);
88
+
const sourceTwo = fromArray([6, 5, 4]);
concat([sourceOne, sourceTwo]),
92
+
subscribe(val => console.log(val))
93
+
); // Prints 1 2 3 6 5 4 to the console.
64
-
// Prints 1 2 3 4 5 6 6 5 4 3 2 1 to the console.
98
+
`concatAll` will combine all sources emitted on an outer source together, subscribing to the
99
+
next source after the previous source completes.
101
+
It's very similar to `concat`, but instead accepts a source of sources as an input.
104
+
let sourceOne = Wonka.fromArray([|1, 2, 3|]);
105
+
let sourceTwo = Wonka.fromArray([|6, 5, 4|]);
107
+
Wonka.fromList([sourceOne, sourceTwo])
109
+
|> Wonka.subscribe((. x) => print_int(x));
110
+
/* Prints 1 2 3 6 5 4 to the console. */
114
+
import { pipe, fromArray, concatAll, subscribe } from 'wonka';
116
+
const sourceOne = fromArray([1, 2, 3]);
117
+
const sourceTwo = fromArray([6, 5, 4]);
120
+
fromArray([sourceOne, sourceTwo]),
122
+
subscribe(val => console.log(val))
123
+
); // Prints 1 2 3 6 5 4 to the console.
···
157
+
subscribe(val => console.log(val))
164
+
`delay` delays all emitted values of a source by the given amount of milliseconds.
166
+
> _Note:_ This operator is only available in JavaScript environments, and will be excluded
167
+
> when compiling natively.
170
+
Wonka.fromList([1, 2])
172
+
|> Wonka.subscribe((. x) => print_int(x));
173
+
/* waits 10ms then prints 1, waits 10ms then prints 2, waits 10ms then ends */
177
+
import { pipe, fromArray, delay, subscribe } from 'wonka';
182
+
subscribe(val => console.log(val))
184
+
// waits 10ms then prints 1, waits 10ms then prints 2, waits 10ms then ends
189
+
`debounce` doesn't emit values of a source until no values have been emitted after
190
+
a given amount of milliseconds. Once this threshold of silence has been reached, the
191
+
last value that has been received will be emitted.
193
+
> _Note:_ This operator is only available in JavaScript environments, and will be excluded
194
+
> when compiling natively.
197
+
let sourceA = Wonka.interval(10)
199
+
let sourceB = Wonka.fromValue(1);
201
+
Wonka.concat([|sourceA, sourceB|])
202
+
|> Wonka.debounce((. _x) => 20)
203
+
|> Wonka.subscribe((. x) => print_int(x));
204
+
/* The five values from sourceA will be omitted */
205
+
/* After these values and after 20ms `1` will be logged */
209
+
import { pipe, interval, take, fromValue, concat, debounce, subscribe } from 'wonka';
211
+
const sourceA = pipe(interval(10), take(5));
212
+
const sourceB = fromValue(1);
215
+
concat([sourceA, sourceB])
216
+
debounce(() => 20),
217
+
subscribe(val => console.log(val))
220
+
// The five values from sourceA will be omitted
221
+
// After these values and after 20ms `1` will be logged
···
`filter` will remove values from a source by passing them through an iteratee that returns a `bool`.
109
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
let isEven = (. n) => n mod 2 === 0;
112
-
source |> Wonka.filter(isEven) |> Wonka.subscribe((. _val) => print_int(_val));
231
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
232
+
|> Wonka.filter(isEven)
233
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 246 to the console. */
import { fromArray, filter, subscribe } from 'wonka';
120
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
const isEven = n => n % 2 === 0;
243
+
fromArray([1, 2, 3, 4, 5, 6]),
245
+
subscribe(val => console.log(val))
// Prints 246 to the console.
···
`map` will transform values from a source by passing them through an iteratee that returns a new value.
139
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
let square = (. n) => n * n;
142
-
source |> Wonka.map(square) |> Wonka.subscribe((. _val) => print_int(_val));
258
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
259
+
|> Wonka.map(square)
260
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 1 4 9 16 25 36 to the console. */
import { fromArray, pipe, map, subscribe } from 'wonka';
150
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
const square = n => n * n;
270
+
fromArray([1, 2, 3, 4, 5, 6]),
272
+
subscribe(val => console.log(val))
// Prints 1 4 9 16 25 36 to the console.
···
166
-
`merge` two sources together into a single source.
280
+
`merge` merges an array of sources together into a single source. It subscribes
281
+
to all sources that it's passed and emits all their values on the output source.
let sourceA = Wonka.fromArray([|1, 2, 3|]);
let sourceB = Wonka.fromArray([|4, 5, 6|]);
172
-
Wonka.merge([|sourceA, sourceB|]) |> Wonka.subscribe((. _val) => print_int(_val));
287
+
Wonka.merge([|sourceA, sourceB|])
288
+
|> Wonka.subscribe((. x) => print_int(x));
289
+
/* Prints 1 2 3 4 5 6 to the console. */
174
-
/* Prints 1 2 3 4 5 6 to the console.
293
+
import { fromArray, pipe, merge, subscribe } from 'wonka';
295
+
const sourceOne = fromArray([1, 2, 3]);
296
+
const sourceTwo = fromArray([4, 5, 6]);
299
+
merge(sourceOne, sourceTwo),
300
+
subscribe((val) => console.log(val))
301
+
); // Prints 1 2 3 4 5 6 to the console.
306
+
`mergeAll` will merge all sources emitted on an outer source into a single one.
307
+
It's very similar to `merge`, but instead accepts a source of sources as an input.
309
+
> _Note:_ This operator is also exported as `flatten` which is just an alias for `mergeAll`
312
+
let sourceA = Wonka.fromArray([|1, 2, 3|]);
313
+
let sourceB = Wonka.fromArray([|4, 5, 6|]);
315
+
Wonka.fromList([sourceA, sourceB])
317
+
|> Wonka.subscribe((. x) => print_int(x));
318
+
/* Prints 1 2 3 4 5 6 to the console. */
178
-
import { fromArray, pipe, merge, subscribe } from 'wonka';
322
+
import { pipe, fromArray, mergeAll, subscribe } from 'wonka';
const sourceOne = fromArray([1, 2, 3]);
const sourceTwo = fromArray([4, 5, 6]);
184
-
merge(sourceOne, sourceTwo)
185
-
subscribe((val) => {
328
+
fromArray([sourceOne, sourceTwo]),
330
+
subscribe(val => console.log(val))
331
+
); // Prints 1 2 3 4 5 6 to the console.
190
-
// Prints 1 2 3 4 5 6 to the console.
336
+
`mergeMap` allows you to map values of an outer source to an inner source.
337
+
This allows you to create nested sources for each emitted value, which will
338
+
all be merged into a single source, like with `mergeAll`.
340
+
Unlike `concatMap` all inner sources will be subscribed to at the same time
341
+
and all their values will be emitted on the output source as they come in.
344
+
Wonka.fromList([1, 2])
345
+
|> Wonka.mergeMap((. value) =>
346
+
Wonka.fromList([value - 1, value]))
347
+
|> Wonka.subscribe((. x) => print_int(x));
348
+
/* Prints 0 1 1 2 to the console. */
352
+
import { pipe, fromArray, mergeMap, subscribe } from 'wonka';
356
+
mergeMap(x => fromArray([x - 1, x])),
357
+
subscribe(val => console.log(val))
358
+
); // Prints 0 1 1 2 to the console.
···
let sourceOne = Wonka.fromPromise(promiseOne);
let sourceTwo = Wonka.fromPromise(promiseTwo);
210
-
let source = Wonka.concat([|sourceOne, sourceTwo|]);
213
-
|> Wonka.onEnd((.) => print_endline("onEnd"))
214
-
|> Wonka.subscribe((. _val) => print_endline(_val));
379
+
Wonka.concat([|sourceOne, sourceTwo|])
380
+
|> Wonka.onEnd((.) => print_endline("onEnd"))
381
+
|> Wonka.subscribe((. x) => print_endline(x));
/* Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately. */
···
const sourceOne = fromPromise(promiseOne);
const sourceTwo = fromPromise(promiseTwo);
235
-
const source = concat([sourceOne, sourceTwo]);
240
-
console.log('onEnd');
404
+
concat([sourceOne, sourceTwo]),
405
+
onEnd(() => console.log('onEnd')),
406
+
subscribe(val => console.log(val))
// Logs ResolveOne after one second, then ResolveTwo after an additional second, then onEnd immediately.
···
Run a callback on each `Push` signal sent to the sink by the source.
255
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
258
-
|> Wonka.onPush((. _val) => print_string({j|Push $_val|j}))
259
-
|> Wonka.subscribe((. _val) => print_int(_val));
417
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
418
+
|> Wonka.onPush((. x) => print_string({j|Push $x|j}))
419
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console. */
import { fromArray, pipe, onPush, subscribe } from 'wonka';
267
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
272
-
console.log(`Push ${val}`);
279
-
// Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console.
427
+
fromArray([1, 2, 3, 4, 5, 6]),
428
+
onPush(val => console.log(`Push ${val}`)),
429
+
subscribe(val => console.log(val))
430
+
); // Prints Push 1 1 Push 2 2 Push 3 3 Push 4 4 Push 5 5 Push 6 6 to the console.
···
Js.Global.setTimeout(() => resolve(. "Resolve"), 1000) |> ignore
292
-
let source = Wonka.fromPromise(promise);
295
-
|> Wonka.onStart((.) => print_endline("onStart"))
296
-
|> Wonka.subscribe((. _val) => print_endline(_val));
443
+
Wonka.fromPromise(promise)
444
+
|> Wonka.onStart((.) => print_endline("onStart"))
445
+
|> Wonka.subscribe((. _val) => print_endline(_val));
/* Logs onStart to the console, pauses for one second to allow the timeout to finish,
then logs "Resolve" to the console. */
···
311
-
const source = fromPromise(promise);
316
-
console.log('onStart');
460
+
fromPromise(promise),
461
+
onStart(() => console.log('onStart')),
462
+
subscribe(val => console.log(val))
// Logs onStart to the console, pauses for one second to allow the timeout to finish,
// then logs "Resolve" to the console.
471
+
`sample` emits the previously emitted value from an outer source every time
472
+
an inner source (notifier) emits.
474
+
In combination with `interval` it can be used to get values from a noisy source
479
+
|> Wonka.sample(Wonka.interval(100))
481
+
|> Wonka.subscribe((. x) => print_int(x));
482
+
/* Prints 10 20 to the console. */
486
+
import { pipe, interval, sample, take, subscribe } from 'wonka';
490
+
sample(interval(100)),
492
+
subscribe(x => console.log(x))
493
+
); // Prints 10 20 to the console.
Accumulate emitted values of a source in a accumulator, similar to JavaScript `reduce`.
332
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
335
-
|> Wonka.scan((. acc, x) => acc + x, 0)
336
-
|> Wonka.subscribe((. _val) => print_int(_val));
501
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
502
+
|> Wonka.scan((. acc, x) => acc + x, 0)
503
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 1 3 6 10 15 21 to the console. */
import { fromArray, pipe, scan, subscribe } from 'wonka';
343
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
511
+
fromArray([1, 2, 3, 4, 5, 6]),
scan((acc, val) => acc + val),
513
+
subscribe(val => console.log(val))
// Prints 1 3 6 10 15 21 to the console.
521
+
`share` ensures that all subscriptions to the underlying source are shared.
358
-
`skip` the specified number of emissions from the source.
523
+
By default Wonka's sources are lazy. They only instantiate themselves and begin
524
+
emitting signals when they're being subscribed to, since they're also immutable.
525
+
This means that when a source is used in multiple places, their underlying subscription
526
+
is not shared. Instead, the entire chain of sources and operators will be instantiated
527
+
separately every time.
529
+
The `share` operator prevents this by creating an output source that will reuse a single
530
+
subscription to the parent source, which will be unsubscribed from when no sinks are
531
+
listening to it anymore.
533
+
This is especially useful if you introduce side-effects to your sources,
534
+
for instance with `onStart`.
361
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
537
+
let source = Wonka.never
538
+
|> Wonka.onStart((.) => print_endline("start"))
363
-
source |> Wonka.skip(2) |> Wonka.subscribe((. _val) => print_int(_val));
541
+
/* Without share this would print "start" twice: */
542
+
Wonka.publish(source);
543
+
Wonka.publish(source);
547
+
import { pipe, never, onStart, share, publish } from 'wonka';
549
+
const source = pipe(
551
+
onStart(() => console.log('start')),
555
+
// Without share this would print "start" twice:
562
+
`skip` the specified number of emissions from the source.
565
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
567
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 3 4 5 6 to the console, since the first two emissions from the source were skipped.
import { fromArray, pipe, skip, subscribe } from 'wonka';
371
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
575
+
fromArray([1, 2, 3, 4, 5, 6]),
577
+
subscribe(val => console.log(val))
···
let source = Wonka.interval(100);
let notifier = Wonka.interval(500);
390
-
source |> Wonka.skipUntil(notifier) |> Wonka.subscribe((. _val) => print_int(_val));
590
+
|> Wonka.skipUntil(notifier)
591
+
|> Wonka.subscribe((. x) => print_int(x));
/* Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
Then logs 4 5 6 7 8 9 10... to the console every 500ms. */
···
606
+
subscribe(val => console.log(val))
// Skips all values emitted by source (0, 1, 2, 3) until notifier emits at 500ms.
···
import { fromArray, pipe, skipWhile, subscribe } from 'wonka';
431
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
631
+
fromArray([1, 2, 3, 4, 5, 6]),
skipWhile(val => val < 5),
633
+
subscribe(val => console.log(val))
// Prints 5 6 to the console, as 1 2 3 4 all return true for the predicate function.
641
+
`switchMap` allows you to map values of an outer source to an inner source.
642
+
The inner source's values will be emitted on the returned output source. If
643
+
a new inner source is returned, because the outer source emitted a new value
644
+
before the previous inner source completed, the inner source is closed and unsubscribed
647
+
This is similar to `concatMap` but instead of waiting for the last inner source to complete
648
+
before emitting values from the next, `switchMap` just cancels the previous inner source.
652
+
|> Wonka.switchMap((. _value) =>
653
+
Wonka.interval(40))
655
+
|> Wonka.subscribe((. x) => print_int(x));
656
+
/* Prints 1 2 3 to the console. */
657
+
/* The inner interval is cancelled after its first value every time */
661
+
import { pipe, interval, switchMap, take, subscribe } from 'wonka';
665
+
// The inner interval is cancelled after its first value every time
666
+
switchMap(value => interval(40)),
668
+
subscribe(x => console.log(x))
669
+
); // Prints 1 2 3 to the console
674
+
`switchAll` will combined sources emitted on an outer source together, subscribing
675
+
to only one source at a time, and cancelling the previous inner source, when it hasn't
676
+
ended while the next inner source is created.
678
+
It's very similar to `switchMap`, but instead accepts a source of sources.
682
+
|> Wonka.map((. _value) =>
683
+
Wonka.interval(40))
686
+
|> Wonka.subscribe((. x) => print_int(x));
687
+
/* Prints 1 2 3 to the console. */
691
+
import { pipe, interval, map, switchAll, take, subscribe } from 'wonka';
695
+
map(() => interval(40)),
698
+
subscribe(x => console.log(x))
699
+
); // Prints 1 2 3 to the console
702
+
These examples are practically identical to the `switchMap` examples, but note
703
+
that `map` was used instead of using `switchMap` directly. This is because combining
704
+
`map` with a subsequent `switchAll` is the same as using `switchMap`.
`take` only a specified number of emissions from the source before completing. `take` is the opposite of `skip`.
449
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
451
-
source |> Wonka.take(3) |> Wonka.subscribe((. _val) => print_int(_val));
711
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|])
713
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 1 2 3 to the console. */
import { fromArray, pipe, take, subscribe } from 'wonka';
459
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
721
+
fromArray([1, 2, 3, 4, 5, 6]),
723
+
subscribe(val => console.log(val))
// Prints 1 2 3 to the console.
···
`takeLast` will take only the last n emissions from the source.
477
-
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
479
-
source |> Wonka.takeLast(3) |> Wonka.subscribe((. _val) => print_int(_val));
734
+
Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
735
+
|> Wonka.takeLast(3)
736
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 4 5 6 to the console. */
import { fromArray, pipe, takeLast, subscribe } from 'wonka';
487
-
const source = fromArray([1, 2, 3, 4, 5, 6]);
744
+
fromArray([1, 2, 3, 4, 5, 6]),
746
+
subscribe(val => console.log(val))
// Prints 4 5 6 to the console.
···
let notifier = Wonka.interval(500);
509
-
|> Wonka.takeUntil(notifier)
510
-
|> Wonka.subscribe((. _val) => print_int(_val));
761
+
|> Wonka.takeUntil(notifier)
762
+
|> Wonka.subscribe((. x) => print_int(x));
/* Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
prints 3, pauses 100, then completes (notifier emits). */
···
777
+
subscribe(val => console.log(val))
// Pauses 100ms, prints 0, pauses 100ms, prints 1, pauses 100ms, prints 2, pauses 100ms,
···
let source = Wonka.fromArray([|1, 2, 3, 4, 5, 6|]);
542
-
|> Wonka.takeWhile((. _val) => _val < 5)
543
-
|> Wonka.subscribe((. _val) => print_int(_val));
792
+
|> Wonka.takeWhile((. x) => x < 5)
793
+
|> Wonka.subscribe((. x) => print_int(x));
/* Prints 1 2 3 4 to the console. */
···
takeWhile(val => val < 5),
806
+
subscribe(val => console.log(val))
// Prints 1 2 3 4 to the console.
814
+
`throttle` emits values of a source, but after each value it will omit all values for
815
+
the given amount of milliseconds. It enforces a time of silence after each value it
816
+
receives and skips values while the silence is still ongoing.
818
+
This is very similar to `debounce` but instead of waiting for leading time before a
819
+
value it waits for trailing time after a value.
821
+
> _Note:_ This operator is only available in JavaScript environments, and will be excluded
822
+
> when compiling natively.
826
+
|> Wonka.throttle((. _x) => 50)
828
+
|> Wonka.subscribe((. x) => print_int(x));
829
+
/* Outputs 0 6 to the console. */
833
+
import { pipe, interval, throttle, take, subscribe } from 'wonka';
839
+
subscribe(val => console.log(val))
840
+
); // Outputs 0 6 to the console.