1---
2title: Sinks
3order: 2
4---
5
6A sink in Wonka expects to be delivered data. A `sink` communicates with a source via the "talkback" function provided by the source. Wonka has the following `sink` operators.
7
8## subscribe
9
10`subscribe` accepts a callback function to execute when data is received from the source, in addition to the source itself.
11
12```reason
13Wonka.fromArray([|1, 2, 3|])
14 |> Wonka.subscribe((. x) => print_int(x));
15/* Prints 123 to the console. */
16```
17
18```typescript
19import { pipe, fromArray, subscribe } from 'wonka';
20
21pipe(
22 fromArray([1, 2, 3]),
23 subscribe((x) => console.log(x))
24); // Prints 123 to the console.
25```
26
27`subscribe` also returns a "subscription" type, which can be used to
28unsubscribe from the source. This allows you to cancel a source and stop receiving
29new incoming values.
30
31```reason
32let subscription = source
33 |> Wonka.subscribe((. x) => print_int(x));
34
35subscription.unsubscribe();
36```
37
38```typescript
39import { pipe, subscribe } from 'wonka';
40
41const [unsubscribe] = pipe(
42 source,
43 subscribe((x) => console.log(x));
44);
45
46unsubscribe();
47```
48
49## forEach
50
51`forEach` works the same as `subscribe` but doesn't return a subscription.
52It will just call the passed callback for each incoming value.
53
54```reason
55Wonka.fromArray([|1, 2, 3|])
56 |> Wonka.forEach((. x) => print_int(x));
57/* Returns unit; Prints 123 to the console. */
58```
59
60```typescript
61import { pipe, fromArray, forEach } from 'wonka';
62
63pipe(
64 fromArray([1, 2, 3]),
65 forEach((x) => console.log(x))
66); // Returns undefined; Prints 123 to the console.
67```
68
69## publish
70
71`publish` subscribes to a source, like `subscribe` does, but doesn't accept
72a callback function. It's useful for side-effects, where the values are already being
73used as part of the stream itself.
74
75In this example we're using [`onPush`](./operators.md#onpush) to pass a callback to react to incoming
76values instead.
77
78```reason
79Wonka.fromArray([|1, 2, 3|])
80 |> Wonka.onPush((. x) => print_int(x))
81 |> Wonka.publish;
82/* Prints 123 to the console. */
83```
84
85```typescript
86import { pipe, fromArray, onPush, publish } from 'wonka';
87
88pipe(
89 fromArray([1, 2, 3]),
90 onPush((x) => console.log(x)),
91 publish
92); // Prints 123 to the console.
93```
94
95## toPromise
96
97`toPromise` returns a promise, which resolves on the last value of a source.
98
99> _Note:_ This source is only available in JavaScript environments, and will be excluded
100> when compiling natively.
101
102```reason
103Wonka.fromArray([|1, 2, 3|])
104 |> Wonka.toPromise
105 |> Js.Promise.then_(x => {
106 print_int(x);
107 Js.Promise.resolve(())
108 })
109/* Prints 3 to the console. */
110```
111
112```typescript
113import { pipe, fromArray, toPromise } from 'wonka';
114
115const promise = pipe(
116 fromArray([1, 2, 3]),
117 toPromise,
118);
119
120promise.then(x => console.log(x));
121// Prints 3 to the console.
122```
123
124If you have a source that doesn't complete and are looking to resolve on the first
125value instead of the last, you may have to apply `take(1)` to your source.
126
127## toObservable
128
129`toObservable` returns a [spec-compliant JS Observable](https://github.com/tc39/proposal-observable), which emits the same
130values as a source.
131
132As per the specification, the Observable is annotated using `Symbol.observable`.
133
134> _Note:_ This sink is only available in JavaScript environments, and will be excluded
135> when compiling natively.
136
137```reason
138let observable = Wonka.fromArray([|1, 2, 3|])
139 |> Wonka.toObservable;
140
141observable##subscribe([@bs] {
142 as _;
143 pub next = value => print_int(value);
144 pub complete = () => ();
145 pub error = _ => ();
146}); /* Prints 1 2 3 to the console. */
147```
148
149```typescript
150import { pipe, fromArray, toObservable } from 'wonka';
151
152const observable = pipe(
153 fromArray([1, 2, 3]),
154 toObservable,
155);
156
157observable.subscribe({
158 next: value => console.log(value),
159 complete: () => {},
160 error: () => {},
161}); // Prints 1 2 3 to the console.
162```
163
164## toCallbag
165
166`toCallbag` returns a [spec-compliant JS Callbag](https://github.com/callbag/callbag), which emits the same signals
167as a Wonka source.
168
169Since Wonka's sources are very similar to callbags and only diverge from the specification
170minimally, Callbags map to Wonka's sources very closely and `toCallbag` only creates a thin
171wrapper which is mostly concerned with converting between the type signatures.
172
173> _Note:_ This sink is only available in JavaScript environments, and will be excluded
174> when compiling natively.
175
176```reason
177/* This example uses the callbag-iterate package for illustrative purposes */
178[@bs.module] external callbagIterate:
179 (. ('a => unit)) => (. Wonka.callbagT('a)) => unit = "callbag-iterate";
180
181let callbag = Wonka.fromArray([|1, 2, 3|])
182 |> Wonka.toCallbag;
183
184callbagIterate(. value => {
185 print_int(value);
186})(. callbag); /* Prints 1 2 3 to the console. */
187```
188
189```typescript
190import { pipe, fromArray, toCallbag } from 'wonka';
191
192// This example uses the callbag-iterate package for illustrative purposes
193import callbagIterate from 'callbag-iterate';
194
195const callbag = pipe(
196 fromArray([1, 2, 3]),
197 toCallbag,
198);
199
200callbagIterate(value => console.log(value))(callbag);
201// Prints 1 2 3 to the console.
202```