Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow

title: Sinks order: 2#

A sink in Wonka expects to be delivered data. A sink communicates with a source via the "talkback" function provided by the source. Wonka has the following sink operators.

subscribe#

subscribe accepts a callback function to execute when data is received from the source, in addition to the source itself.

Wonka.fromArray([|1, 2, 3|])
  |> Wonka.subscribe((. x) => print_int(x));
/* Prints 123 to the console. */
import { pipe, fromArray, subscribe } from 'wonka';

pipe(
  fromArray([1, 2, 3]),
  subscribe((x) => console.log(x))
); // Prints 123 to the console.

subscribe also returns a "subscription" type, which can be used to unsubscribe from the source. This allows you to cancel a source and stop receiving new incoming values.

let subscription = source
  |> Wonka.subscribe((. x) => print_int(x));

subscription.unsubscribe();
import { pipe, subscribe } from 'wonka';

const [unsubscribe] = pipe(
  source,
  subscribe((x) => console.log(x));
);

unsubscribe();

forEach#

forEach works the same as subscribe but doesn't return a subscription. It will just call the passed callback for each incoming value.

Wonka.fromArray([|1, 2, 3|])
  |> Wonka.forEach((. x) => print_int(x));
/* Returns unit; Prints 123 to the console. */
import { pipe, fromArray, forEach } from 'wonka';

pipe(
  fromArray([1, 2, 3]),
  forEach((x) => console.log(x))
); // Returns undefined; Prints 123 to the console.

publish#

publish subscribes to a source, like subscribe does, but doesn't accept a callback function. It's useful for side-effects, where the values are already being used as part of the stream itself.

In this example we're using onPush to pass a callback to react to incoming values instead.

Wonka.fromArray([|1, 2, 3|])
  |> Wonka.onPush((. x) => print_int(x))
  |> Wonka.publish;
/* Prints 123 to the console. */
import { pipe, fromArray, onPush, publish } from 'wonka';

pipe(
  fromArray([1, 2, 3]),
  onPush((x) => console.log(x)),
  publish
); // Prints 123 to the console.

toPromise#

toPromise returns a promise, which resolves on the last value of a source.

Note: This source is only available in JavaScript environments, and will be excluded when compiling natively.

Wonka.fromArray([|1, 2, 3|])
  |> Wonka.toPromise
  |> Js.Promise.then_(x => {
    print_int(x);
    Js.Promise.resolve(())
  })
/* Prints 3 to the console. */
import { pipe, fromArray, toPromise } from 'wonka';

const promise = pipe(
  fromArray([1, 2, 3]),
  toPromise,
);

promise.then(x => console.log(x));
// Prints 3 to the console.

If you have a source that doesn't complete and are looking to resolve on the first value instead of the last, you may have to apply take(1) to your source.

toObservable#

toObservable returns a spec-compliant JS Observable, which emits the same values as a source.

As per the specification, the Observable is annotated using Symbol.observable.

Note: This sink is only available in JavaScript environments, and will be excluded when compiling natively.

let observable = Wonka.fromArray([|1, 2, 3|])
  |> Wonka.toObservable;

observable##subscribe([@bs] {
  as _;
  pub next = value => print_int(value);
  pub complete = () => ();
  pub error = _ => ();
}); /* Prints 1 2 3 to the console. */
import { pipe, fromArray, toObservable } from 'wonka';

const observable = pipe(
  fromArray([1, 2, 3]),
  toObservable,
);

observable.subscribe({
  next: value => console.log(value),
  complete: () => {},
  error: () => {},
}); // Prints 1 2 3 to the console.

toCallbag#

toCallbag returns a spec-compliant JS Callbag, which emits the same signals as a Wonka source.

Since Wonka's sources are very similar to callbags and only diverge from the specification minimally, Callbags map to Wonka's sources very closely and toCallbag only creates a thin wrapper which is mostly concerned with converting between the type signatures.

Note: This sink is only available in JavaScript environments, and will be excluded when compiling natively.

/* This example uses the callbag-iterate package for illustrative purposes */
[@bs.module] external callbagIterate:
  (. ('a => unit)) => (. Wonka.callbagT('a)) => unit = "callbag-iterate";

let callbag = Wonka.fromArray([|1, 2, 3|])
  |> Wonka.toCallbag;

callbagIterate(. value => {
  print_int(value);
})(. callbag); /* Prints 1 2 3 to the console. */
import { pipe, fromArray, toCallbag } from 'wonka';

// This example uses the callbag-iterate package for illustrative purposes
import callbagIterate from 'callbag-iterate';

const callbag = pipe(
  fromArray([1, 2, 3]),
  toCallbag,
);

callbagIterate(value => console.log(value))(callbag);
// Prints 1 2 3 to the console.