Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; 2import { 3 push, 4 start, 5 talkbackPlaceholder, 6 teardownPlaceholder, 7 asyncIteratorSymbol, 8} from './helpers'; 9import { share } from './operators'; 10 11/** Helper creating a Source from a factory function when it's subscribed to. 12 * @param produce - A factory function returning a {@link Source}. 13 * @returns A {@link Source} lazyily subscribing to the Source returned by the given factory 14 * function. 15 * 16 * @remarks 17 * At times it's necessary to create a {@link Source} lazily. The time of a {@link Source} being 18 * created could be different from when it's subscribed to, and hence we may want to split the 19 * creation and subscription time. This is especially useful when the Source we wrap is "hot" and 20 * issues values as soon as it's created, which we may then not receive in a subscriber. 21 * 22 * @example An example of creating a {@link Source} that issues the timestamp of subscription. Here 23 * we effectively use `lazy` with the simple {@link fromValue | `fromValue`} source, to quickly 24 * create a Source that issues the time of its subscription, rather than the time of its creation 25 * that it would otherwise issue without `lazy`. 26 * 27 * ```ts 28 * lazy(() => fromValue(Date.now())); 29 * ``` 30 */ 31export function lazy<T>(produce: () => Source<T>): Source<T> { 32 return sink => produce()(sink); 33} 34 35/** Converts an AsyncIterable to a Source that pulls and issues values from it as requested. 36 * 37 * @see {@link fromIterable | `fromIterable`} for the non-async Iterable version of this helper, 38 * which calls this helper automatically as needed. 39 * 40 * @param iterable - An {@link AsyncIterable | `AsyncIterable`}. 41 * @returns A {@link Source} issuing values sourced from the Iterable. 42 * 43 * @remarks 44 * `fromAsyncIterable` will create a {@link Source} that pulls and issues values from a given 45 * {@link AsyncIterable}. This can be used in many interoperability situations, including to consume 46 * an async generator function. 47 * 48 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it 49 * using {@link AsyncIterator.throw}, which allows an async generator to recover from the exception. 50 * 51 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols} 52 * for the JS Iterable protocol. 53 */ 54export function fromAsyncIterable<T>(iterable: AsyncIterable<T> | AsyncIterator<T>): Source<T> { 55 return sink => { 56 const iterator: AsyncIterator<T> = 57 (iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable; 58 59 let ended = false; 60 let looping = false; 61 let pulled = false; 62 let next: IteratorResult<T>; 63 sink( 64 start(async signal => { 65 if (signal === TalkbackKind.Close) { 66 ended = true; 67 if (iterator.return) iterator.return(); 68 } else if (looping) { 69 pulled = true; 70 } else { 71 for (pulled = looping = true; pulled && !ended; ) { 72 if ((next = await iterator.next()).done) { 73 ended = true; 74 if (iterator.return) await iterator.return(); 75 sink(SignalKind.End); 76 } else { 77 try { 78 pulled = false; 79 sink(push(next.value)); 80 } catch (error) { 81 if (iterator.throw) { 82 if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End); 83 } else { 84 throw error; 85 } 86 } 87 } 88 } 89 looping = false; 90 } 91 }) 92 ); 93 }; 94} 95 96/** Converts an Iterable to a Source that pulls and issues values from it as requested. 97 * @see {@link fromAsyncIterable | `fromAsyncIterable`} for the AsyncIterable version of this helper. 98 * @param iterable - An {@link Iterable | `Iterable`} or an `AsyncIterable` 99 * @returns A {@link Source} issuing values sourced from the Iterable. 100 * 101 * @remarks 102 * `fromIterable` will create a {@link Source} that pulls and issues values from a given 103 * {@link Iterable | JS Iterable}. As iterables are the common standard for any lazily iterated list 104 * of values in JS it can be applied to many different JS data types, including a JS Generator 105 * function. 106 * 107 * This Source will only call {@link Iterator.next} on the iterator when the subscribing {@link Sink} 108 * has pulled a new value with the {@link TalkbackKind.Pull | Pull signal}. `fromIterable` can 109 * therefore also be applied to "infinite" iterables, without a predefined end. 110 * 111 * This helper will call {@link fromAsyncIterable | `fromAsyncIterable`} automatically when the 112 * passed object also implements the async iterator protocol. 113 * 114 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it 115 * using {@link Iterator.throw}, which allows a generator to recover from the exception. 116 * 117 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol} 118 * for the JS Iterable protocol. 119 */ 120export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> { 121 if (iterable[asyncIteratorSymbol()]) return fromAsyncIterable(iterable as AsyncIterable<T>); 122 return sink => { 123 const iterator = iterable[Symbol.iterator](); 124 let ended = false; 125 let looping = false; 126 let pulled = false; 127 let next: IteratorResult<T>; 128 sink( 129 start(signal => { 130 if (signal === TalkbackKind.Close) { 131 ended = true; 132 if (iterator.return) iterator.return(); 133 } else if (looping) { 134 pulled = true; 135 } else { 136 for (pulled = looping = true; pulled && !ended; ) { 137 if ((next = iterator.next()).done) { 138 ended = true; 139 if (iterator.return) iterator.return(); 140 sink(SignalKind.End); 141 } else { 142 try { 143 pulled = false; 144 sink(push(next.value)); 145 } catch (error) { 146 if (iterator.throw) { 147 if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End); 148 } else { 149 throw error; 150 } 151 } 152 } 153 } 154 looping = false; 155 } 156 }) 157 ); 158 }; 159} 160 161/** Creates a Source that issues a each value of a given array synchronously. 162 * @see {@link fromIterable} which `fromArray` aliases. 163 * @param array - The array whose values will be issued one by one. 164 * @returns A {@link Source} issuing the array's values. 165 * 166 * @remarks 167 * `fromArray` will create a {@link Source} that issues the values of a given JS array one by one. It 168 * will issue values as they're pulled and is hence a "cold" source, not eagerly emitting values. It 169 * will end and issue the {@link SignalKind.End | End signal} when the array is exhausted of values. 170 * 171 * @example 172 * ```ts 173 * fromArray([1, 2, 3]); 174 * ``` 175 */ 176export const fromArray: <T>(array: T[]) => Source<T> = fromIterable; 177 178/** Creates a Source that issues a single value and ends immediately after. 179 * @param value - The value that will be issued. 180 * @returns A {@link Source} issuing the single value. 181 * 182 * @example 183 * ```ts 184 * fromValue('test'); 185 * ``` 186 */ 187export function fromValue<T>(value: T): Source<T> { 188 return sink => { 189 let ended = false; 190 sink( 191 start(signal => { 192 if (signal === TalkbackKind.Close) { 193 ended = true; 194 } else if (!ended) { 195 ended = true; 196 sink(push(value)); 197 sink(SignalKind.End); 198 } 199 }) 200 ); 201 }; 202} 203 204/** Creates a new Source from scratch from a passed `subscriber` function. 205 * @param subscriber - A callback that is called when the {@link Source} is subscribed to. 206 * @returns A {@link Source} created from the `subscriber` parameter. 207 * 208 * @remarks 209 * `make` is used to create a new, arbitrary {@link Source} from scratch. It calls the passed 210 * `subscriber` function when it's subscribed to. 211 * 212 * The `subscriber` function receives an {@link Observer}. You may call {@link Observer.next} to 213 * issue values on the Source, and {@link Observer.complete} to end the Source. 214 * 215 * Your `subscribr` function must return a {@link TeardownFn | teardown function} which is only 216 * called when your source is cancelled — not when you invoke `complete` yourself. As this creates a 217 * "cold" source, every time this source is subscribed to, it will invoke the `subscriber` function 218 * again and create a new source. 219 * 220 * @example 221 * 222 * ```ts 223 * make(observer => { 224 * const frame = requestAnimationFrame(() => { 225 * observer.next('animate!'); 226 * }); 227 * return () => { 228 * cancelAnimationFrame(frame); 229 * }; 230 * }); 231 * ``` 232 */ 233export function make<T>(subscriber: (observer: Observer<T>) => TeardownFn): Source<T> { 234 return sink => { 235 let ended = false; 236 const teardown = subscriber({ 237 next(value: T) { 238 if (!ended) sink(push(value)); 239 }, 240 complete() { 241 if (!ended) { 242 ended = true; 243 sink(SignalKind.End); 244 } 245 }, 246 }); 247 sink( 248 start(signal => { 249 if (signal === TalkbackKind.Close && !ended) { 250 ended = true; 251 teardown(); 252 } 253 }) 254 ); 255 }; 256} 257 258/** Creates a new Subject which can be used as an IO event hub. 259 * @returns A new {@link Subject}. 260 * 261 * @remarks 262 * `makeSubject` creates a new {@link Subject}. A Subject is a {@link Source} and an {@link Observer} 263 * combined in one interface, as the Observer is used to send new signals to the Source. This means 264 * that it's "hot" and hence all subscriptions to {@link Subject.source} share the same underlying 265 * signals coming from {@link Subject.next} and {@link Subject.complete}. 266 * 267 * @example 268 * ```ts 269 * const subject = makeSubject(); 270 * pipe(subject.source, subscribe(console.log)); 271 * // This will log the string on the above subscription 272 * subject.next('hello subject!'); 273 * ``` 274 */ 275export function makeSubject<T>(): Subject<T> { 276 let next: Subject<T>['next'] | void; 277 let complete: Subject<T>['complete'] | void; 278 return { 279 source: share( 280 make(observer => { 281 next = observer.next; 282 complete = observer.complete; 283 return teardownPlaceholder; 284 }) 285 ), 286 next(value: T) { 287 if (next) next(value); 288 }, 289 complete() { 290 if (complete) complete(); 291 }, 292 }; 293} 294 295/** A {@link Source} that immediately ends. 296 * @remarks 297 * `empty` is a {@link Source} that immediately issues an {@link SignalKind.End | End signal} when 298 * it's subscribed to, ending immediately. 299 * 300 * @see {@link never | `never`} for a source that instead never ends. 301 */ 302export const empty: Source<any> = (sink: Sink<any>): void => { 303 let ended = false; 304 sink( 305 start(signal => { 306 if (signal === TalkbackKind.Close) { 307 ended = true; 308 } else if (!ended) { 309 ended = true; 310 sink(SignalKind.End); 311 } 312 }) 313 ); 314}; 315 316/** A {@link Source} without values that never ends. 317 * @remarks 318 * `never` is a {@link Source} that never issues any signals and neither sends values nor ends. 319 * 320 * @see {@link empty | `empty`} for a source that instead ends immediately. 321 */ 322export const never: Source<any> = (sink: Sink<any>): void => { 323 sink(start(talkbackPlaceholder)); 324}; 325 326/** Creates a Source that issues an incrementing integer in intervals. 327 * @param ms - The interval in milliseconds. 328 * @returns A {@link Source} issuing an incrementing count on each interval. 329 * 330 * @remarks 331 * `interval` will create a {@link Source} that issues an incrementing counter each time the `ms` 332 * interval expires. 333 * 334 * It'll only stop when it's cancelled by a {@link TalkbackKind.Close | Close signal}. 335 * 336 * @example 337 * An example printing `0`, then `1`, and so on, in intervals of 50ms. 338 * 339 * ```ts 340 * pipe(interval(50), subscribe(console.log)); 341 * ``` 342 */ 343export function interval(ms: number): Source<number> { 344 return make(observer => { 345 let i = 0; 346 const id = setInterval(() => observer.next(i++), ms); 347 return () => clearInterval(id); 348 }); 349} 350 351/** Converts DOM Events to a Source given an `HTMLElement` and an event's name. 352 * @param element - The {@link HTMLElement} to listen to. 353 * @param event - The DOM Event name to listen to. 354 * @returns A {@link Source} issuing the {@link Event | DOM Events} as they're issued by the DOM. 355 * 356 * @remarks 357 * `fromDomEvent` will create a {@link Source} that listens to the given element's events and issues 358 * them as values on the source. This source will only stop when it's cancelled by a 359 * {@link TalkbackKind.Close | Close signal}. 360 * 361 * @example 362 * An example printing `'clicked!'` when the given `#root` element is clicked. 363 * 364 * ```ts 365 * const element = document.getElementById('root'); 366 * pipe( 367 * fromDomEvent(element, 'click'), 368 * subscribe(() => console.log('clicked!')) 369 * ); 370 * ``` 371 */ 372export function fromDomEvent(element: HTMLElement, event: string): Source<Event> { 373 return make(observer => { 374 element.addEventListener(event, observer.next); 375 return () => element.removeEventListener(event, observer.next); 376 }); 377} 378 379/** Converts a Promise to a Source that issues the resolving Promise's value and then ends. 380 * @param promise - The promise that will be wrapped. 381 * @returns A {@link Source} issuing the promise's value when it resolves. 382 * 383 * @remarks 384 * `fromPromise` will create a {@link Source} that issues the {@link Promise}'s resolving value 385 * asynchronously and ends immediately after resolving. 386 * 387 * This helper will not handle the promise's exceptions, and will cause uncaught errors if the 388 * promise rejects without a value. 389 * 390 * @example 391 * An example printing `'resolved!'` when the given promise resolves after a tick. 392 * 393 * ```ts 394 * pipe(fromPromise(Promise.resolve('resolved!')), subscribe(console.log)); 395 * ``` 396 */ 397export function fromPromise<T>(promise: Promise<T>): Source<T> { 398 return make(observer => { 399 promise.then(value => { 400 Promise.resolve(value).then(() => { 401 observer.next(value); 402 observer.complete(); 403 }); 404 }); 405 return teardownPlaceholder; 406 }); 407}