1---
2title: Sinks
3order: 2
4---
5
6A sink in Wonka expects to be delivered data. A `sink` communicates with a source via the "talkback" function provided by the source. Wonka has the following `sink` operators.
7
8## subscribe
9
10`subscribe` accepts a callback function to execute when data is received from the source, in addition to the source itself.
11
12```typescript
13import { pipe, fromArray, subscribe } from 'wonka';
14
15pipe(
16 fromArray([1, 2, 3]),
17 subscribe((x) => console.log(x))
18); // Prints 123 to the console.
19```
20
21`subscribe` also returns a "subscription" type, which can be used to
22unsubscribe from the source. This allows you to cancel a source and stop receiving
23new incoming values.
24
25```typescript
26import { pipe, subscribe } from 'wonka';
27
28const { unsubscribe } = pipe(
29 source,
30 subscribe((x) => console.log(x));
31);
32
33unsubscribe();
34```
35
36## forEach
37
38`forEach` works the same as `subscribe` but doesn't return a subscription.
39It will just call the passed callback for each incoming value.
40
41```typescript
42import { pipe, fromArray, forEach } from 'wonka';
43
44pipe(
45 fromArray([1, 2, 3]),
46 forEach((x) => console.log(x))
47); // Returns undefined; Prints 123 to the console.
48```
49
50## publish
51
52`publish` subscribes to a source, like `subscribe` does, but doesn't accept
53a callback function. It's useful for side-effects, where the values are already being
54used as part of the stream itself.
55
56In this example we're using [`onPush`](./operators.md#onpush) to pass a callback to react to incoming
57values instead.
58
59```typescript
60import { pipe, fromArray, onPush, publish } from 'wonka';
61
62pipe(
63 fromArray([1, 2, 3]),
64 onPush((x) => console.log(x)),
65 publish
66); // Prints 123 to the console.
67```
68
69## toArray
70
71`toArray` returns an array, which contains all values from a pull source.
72This sink is primarily intended for synchronous pull streams. Passing it
73an asynchronous push streams may result in an empty array being returned.
74
75If you're passing an asynchronous push stream `toArray` will cancel it
76before it returns an array.
77
78> _Note:_ If you're using this sink, make sure that your input source streams
79> the values you're collecting partly or fully synchronously.
80
81```typescript
82import { pipe, fromArray, map, toArray } from 'wonka';
83
84pipe(
85 fromArray([1, 2, 3]),
86 map((x) => x * 2),
87 toArray
88); // Returns [2, 4, 6]
89```
90
91## toPromise
92
93`toPromise` returns a promise, which resolves on the last value of a source.
94
95```typescript
96import { pipe, fromArray, toPromise } from 'wonka';
97
98const promise = pipe(fromArray([1, 2, 3]), toPromise);
99
100promise.then((x) => console.log(x));
101// Prints 3 to the console.
102```
103
104If you have a source that doesn't complete and are looking to resolve on the first
105value instead of the last, you may have to apply `take(1)` to your source.
106
107## toObservable
108
109`toObservable` returns a [spec-compliant JS Observable](https://github.com/tc39/proposal-observable), which emits the same
110values as a source.
111
112As per the specification, the Observable is annotated using `Symbol.observable`.
113
114```typescript
115import { pipe, fromArray, toObservable } from 'wonka';
116
117const observable = pipe(fromArray([1, 2, 3]), toObservable);
118
119observable.subscribe({
120 next: (value) => console.log(value),
121 complete: () => {},
122 error: () => {},
123}); // Prints 1 2 3 to the console.
124```
125
126## toCallbag
127
128`toCallbag` returns a [spec-compliant JS Callbag](https://github.com/callbag/callbag), which emits the same signals
129as a Wonka source.
130
131Since Wonka's sources are very similar to callbags and only diverge from the specification
132minimally, Callbags map to Wonka's sources very closely and `toCallbag` only creates a thin
133wrapper which is mostly concerned with converting between the type signatures.
134
135```typescript
136import { pipe, fromArray, toCallbag } from 'wonka';
137
138// This example uses the callbag-iterate package for illustrative purposes
139import callbagIterate from 'callbag-iterate';
140
141const callbag = pipe(fromArray([1, 2, 3]), toCallbag);
142
143callbagIterate((value) => console.log(value))(callbag);
144// Prints 1 2 3 to the console.
145```