1---
2title: Sinks
3order: 2
4---
5
6A sink in Wonka expects to be delivered data. A `sink` communicates with a source via the "talkback" function provided by the source. Wonka has the following `sink` operators.
7
8## subscribe
9
10`subscribe` accepts a callback function to execute when data is received from the source, in addition to the source itself.
11
12```reason
13Wonka.fromArray([|1, 2, 3|])
14 |> Wonka.subscribe((. x) => print_int(x));
15/* Prints 123 to the console. */
16```
17
18```typescript
19import { pipe, fromArray, subscribe } from 'wonka';
20
21pipe(
22 fromArray([1, 2, 3]),
23 subscribe((x) => console.log(x))
24); // Prints 123 to the console.
25```
26
27`subscribe` also returns a "subscription" type, which can be used to
28unsubscribe from the source. This allows you to cancel a source and stop receiving
29new incoming values.
30
31```reason
32let subscription = source
33 |> Wonka.subscribe((. x) => print_int(x));
34
35subscription.unsubscribe();
36```
37
38```typescript
39import { pipe, subscribe } from 'wonka';
40
41const [unsubscribe] = pipe(
42 source,
43 subscribe((x) => console.log(x));
44);
45
46unsubscribe();
47```
48
49## forEach
50
51`forEach` works the same as `subscribe` but doesn't return a subscription.
52It will just call the passed callback for each incoming value.
53
54```reason
55Wonka.fromArray([|1, 2, 3|])
56 |> Wonka.forEach((. x) => print_int(x));
57/* Returns unit; Prints 123 to the console. */
58```
59
60```typescript
61import { pipe, fromArray, forEach } from 'wonka';
62
63pipe(
64 fromArray([1, 2, 3]),
65 forEach((x) => console.log(x))
66); // Returns undefined; Prints 123 to the console.
67```
68
69## publish
70
71`publish` subscribes to a source, like `subscribe` does, but doesn't accept
72a callback function. It's useful for side-effects, where the values are already being
73used as part of the stream itself.
74
75In this example we're using [`onPush`](./operators.md#onpush) to pass a callback to react to incoming
76values instead.
77
78```reason
79Wonka.fromArray([|1, 2, 3|])
80 |> Wonka.onPush((. x) => print_int(x))
81 |> Wonka.publish;
82/* Prints 123 to the console. */
83```
84
85```typescript
86import { pipe, fromArray, onPush, publish } from 'wonka';
87
88pipe(
89 fromArray([1, 2, 3]),
90 onPush((x) => console.log(x)),
91 publish
92); // Prints 123 to the console.
93```
94
95## toArray
96
97`toArray` returns an array, which contains all values from a pull source.
98This sink is primarily intended for synchronous pull streams. Passing it
99an asynchronous push streams may result in an empty array being returned.
100
101If you're passing an asynchronous push stream `toArray` will cancel it
102before it returns an array.
103
104> _Note:_ If you're using this sink, make sure that your input source streams
105> the values you're collecting partly or fully synchronously.
106
107```reason
108Wonka.fromArray([|1, 2, 3|])
109 |> Wonka.map((. x) => x * 2)
110 |> Wonka.toArray
111/* Returns [|2, 4, 6|] */
112```
113
114```typescript
115import { pipe, fromArray, map, toArray } from 'wonka';
116
117pipe(
118 fromArray([1, 2, 3]),
119 map(x => x * 2),
120 toArray
121); // Returns [2, 4, 6]
122```
123
124## toPromise
125
126`toPromise` returns a promise, which resolves on the last value of a source.
127
128> _Note:_ This source is only available in JavaScript environments, and will be excluded
129> when compiling natively.
130
131```reason
132Wonka.fromArray([|1, 2, 3|])
133 |> Wonka.toPromise
134 |> Js.Promise.then_(x => {
135 print_int(x);
136 Js.Promise.resolve(())
137 })
138/* Prints 3 to the console. */
139```
140
141```typescript
142import { pipe, fromArray, toPromise } from 'wonka';
143
144const promise = pipe(
145 fromArray([1, 2, 3]),
146 toPromise,
147);
148
149promise.then(x => console.log(x));
150// Prints 3 to the console.
151```
152
153If you have a source that doesn't complete and are looking to resolve on the first
154value instead of the last, you may have to apply `take(1)` to your source.
155
156## toObservable
157
158`toObservable` returns a [spec-compliant JS Observable](https://github.com/tc39/proposal-observable), which emits the same
159values as a source.
160
161As per the specification, the Observable is annotated using `Symbol.observable`.
162
163> _Note:_ This sink is only available in JavaScript environments, and will be excluded
164> when compiling natively.
165
166```reason
167let observable = Wonka.fromArray([|1, 2, 3|])
168 |> Wonka.toObservable;
169
170observable##subscribe([@bs] {
171 as _;
172 pub next = value => print_int(value);
173 pub complete = () => ();
174 pub error = _ => ();
175}); /* Prints 1 2 3 to the console. */
176```
177
178```typescript
179import { pipe, fromArray, toObservable } from 'wonka';
180
181const observable = pipe(
182 fromArray([1, 2, 3]),
183 toObservable,
184);
185
186observable.subscribe({
187 next: value => console.log(value),
188 complete: () => {},
189 error: () => {},
190}); // Prints 1 2 3 to the console.
191```
192
193## toCallbag
194
195`toCallbag` returns a [spec-compliant JS Callbag](https://github.com/callbag/callbag), which emits the same signals
196as a Wonka source.
197
198Since Wonka's sources are very similar to callbags and only diverge from the specification
199minimally, Callbags map to Wonka's sources very closely and `toCallbag` only creates a thin
200wrapper which is mostly concerned with converting between the type signatures.
201
202> _Note:_ This sink is only available in JavaScript environments, and will be excluded
203> when compiling natively.
204
205```reason
206/* This example uses the callbag-iterate package for illustrative purposes */
207[@bs.module] external callbagIterate:
208 (. ('a => unit)) => (. Wonka.callbagT('a)) => unit = "callbag-iterate";
209
210let callbag = Wonka.fromArray([|1, 2, 3|])
211 |> Wonka.toCallbag;
212
213callbagIterate(. value => {
214 print_int(value);
215})(. callbag); /* Prints 1 2 3 to the console. */
216```
217
218```typescript
219import { pipe, fromArray, toCallbag } from 'wonka';
220
221// This example uses the callbag-iterate package for illustrative purposes
222import callbagIterate from 'callbag-iterate';
223
224const callbag = pipe(
225 fromArray([1, 2, 3]),
226 toCallbag,
227);
228
229callbagIterate(value => console.log(value))(callbag);
230// Prints 1 2 3 to the console.
231```