Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1--- 2title: Sinks 3order: 2 4--- 5 6A sink in Wonka expects to be delivered data. A `sink` communicates with a source via the "talkback" function provided by the source. Wonka has the following `sink` operators. 7 8## subscribe 9 10`subscribe` accepts a callback function to execute when data is received from the source, in addition to the source itself. 11 12```reason 13Wonka.fromArray([|1, 2, 3|]) 14 |> Wonka.subscribe((. x) => print_int(x)); 15/* Prints 123 to the console. */ 16``` 17 18```typescript 19import { pipe, fromArray, subscribe } from 'wonka'; 20 21pipe( 22 fromArray([1, 2, 3]), 23 subscribe((x) => console.log(x)) 24); // Prints 123 to the console. 25``` 26 27`subscribe` also returns a "subscription" type, which can be used to 28unsubscribe from the source. This allows you to cancel a source and stop receiving 29new incoming values. 30 31```reason 32let subscription = source 33 |> Wonka.subscribe((. x) => print_int(x)); 34 35subscription.unsubscribe(); 36``` 37 38```typescript 39import { pipe, subscribe } from 'wonka'; 40 41const [unsubscribe] = pipe( 42 source, 43 subscribe((x) => console.log(x)); 44); 45 46unsubscribe(); 47``` 48 49## forEach 50 51`forEach` works the same as `subscribe` but doesn't return a subscription. 52It will just call the passed callback for each incoming value. 53 54```reason 55Wonka.fromArray([|1, 2, 3|]) 56 |> Wonka.forEach((. x) => print_int(x)); 57/* Returns unit; Prints 123 to the console. */ 58``` 59 60```typescript 61import { pipe, fromArray, forEach } from 'wonka'; 62 63pipe( 64 fromArray([1, 2, 3]), 65 forEach((x) => console.log(x)) 66); // Returns undefined; Prints 123 to the console. 67``` 68 69## publish 70 71`publish` subscribes to a source, like `subscribe` does, but doesn't accept 72a callback function. It's useful for side-effects, where the values are already being 73used as part of the stream itself. 74 75In this example we're using [`onPush`](./operators.md#onpush) to pass a callback to react to incoming 76values instead. 77 78```reason 79Wonka.fromArray([|1, 2, 3|]) 80 |> Wonka.onPush((. x) => print_int(x)) 81 |> Wonka.publish; 82/* Prints 123 to the console. */ 83``` 84 85```typescript 86import { pipe, fromArray, onPush, publish } from 'wonka'; 87 88pipe( 89 fromArray([1, 2, 3]), 90 onPush((x) => console.log(x)), 91 publish 92); // Prints 123 to the console. 93``` 94 95## toArray 96 97`toArray` returns an array, which contains all values from a pull source. 98This sink is primarily intended for synchronous pull streams. Passing it 99an asynchronous push streams may result in an empty array being returned. 100 101If you're passing an asynchronous push stream `toArray` will cancel it 102before it returns an array. 103 104> _Note:_ If you're using this sink, make sure that your input source streams 105> the values you're collecting partly or fully synchronously. 106 107```reason 108Wonka.fromArray([|1, 2, 3|]) 109 |> Wonka.map((. x) => x * 2) 110 |> Wonka.toArray 111/* Returns [|2, 4, 6|] */ 112``` 113 114```typescript 115import { pipe, fromArray, map, toArray } from 'wonka'; 116 117pipe( 118 fromArray([1, 2, 3]), 119 map(x => x * 2), 120 toArray 121); // Returns [2, 4, 6] 122``` 123 124## toPromise 125 126`toPromise` returns a promise, which resolves on the last value of a source. 127 128> _Note:_ This source is only available in JavaScript environments, and will be excluded 129> when compiling natively. 130 131```reason 132Wonka.fromArray([|1, 2, 3|]) 133 |> Wonka.toPromise 134 |> Js.Promise.then_(x => { 135 print_int(x); 136 Js.Promise.resolve(()) 137 }) 138/* Prints 3 to the console. */ 139``` 140 141```typescript 142import { pipe, fromArray, toPromise } from 'wonka'; 143 144const promise = pipe( 145 fromArray([1, 2, 3]), 146 toPromise, 147); 148 149promise.then(x => console.log(x)); 150// Prints 3 to the console. 151``` 152 153If you have a source that doesn't complete and are looking to resolve on the first 154value instead of the last, you may have to apply `take(1)` to your source. 155 156## toObservable 157 158`toObservable` returns a [spec-compliant JS Observable](https://github.com/tc39/proposal-observable), which emits the same 159values as a source. 160 161As per the specification, the Observable is annotated using `Symbol.observable`. 162 163> _Note:_ This sink is only available in JavaScript environments, and will be excluded 164> when compiling natively. 165 166```reason 167let observable = Wonka.fromArray([|1, 2, 3|]) 168 |> Wonka.toObservable; 169 170observable##subscribe([@bs] { 171 as _; 172 pub next = value => print_int(value); 173 pub complete = () => (); 174 pub error = _ => (); 175}); /* Prints 1 2 3 to the console. */ 176``` 177 178```typescript 179import { pipe, fromArray, toObservable } from 'wonka'; 180 181const observable = pipe( 182 fromArray([1, 2, 3]), 183 toObservable, 184); 185 186observable.subscribe({ 187 next: value => console.log(value), 188 complete: () => {}, 189 error: () => {}, 190}); // Prints 1 2 3 to the console. 191``` 192 193## toCallbag 194 195`toCallbag` returns a [spec-compliant JS Callbag](https://github.com/callbag/callbag), which emits the same signals 196as a Wonka source. 197 198Since Wonka's sources are very similar to callbags and only diverge from the specification 199minimally, Callbags map to Wonka's sources very closely and `toCallbag` only creates a thin 200wrapper which is mostly concerned with converting between the type signatures. 201 202> _Note:_ This sink is only available in JavaScript environments, and will be excluded 203> when compiling natively. 204 205```reason 206/* This example uses the callbag-iterate package for illustrative purposes */ 207[@bs.module] external callbagIterate: 208 (. ('a => unit)) => (. Wonka.callbagT('a)) => unit = "callbag-iterate"; 209 210let callbag = Wonka.fromArray([|1, 2, 3|]) 211 |> Wonka.toCallbag; 212 213callbagIterate(. value => { 214 print_int(value); 215})(. callbag); /* Prints 1 2 3 to the console. */ 216``` 217 218```typescript 219import { pipe, fromArray, toCallbag } from 'wonka'; 220 221// This example uses the callbag-iterate package for illustrative purposes 222import callbagIterate from 'callbag-iterate'; 223 224const callbag = pipe( 225 fromArray([1, 2, 3]), 226 toCallbag, 227); 228 229callbagIterate(value => console.log(value))(callbag); 230// Prints 1 2 3 to the console. 231```