Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.2.0 14 kB view raw
1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; 2import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers'; 3import { share } from './operators'; 4 5/** Helper creating a Source from a factory function when it's subscribed to. 6 * @param produce - A factory function returning a {@link Source}. 7 * @returns A {@link Source} lazyily subscribing to the Source returned by the given factory 8 * function. 9 * 10 * @remarks 11 * At times it's necessary to create a {@link Source} lazily. The time of a {@link Source} being 12 * created could be different from when it's subscribed to, and hence we may want to split the 13 * creation and subscription time. This is especially useful when the Source we wrap is "hot" and 14 * issues values as soon as it's created, which we may then not receive in a subscriber. 15 * 16 * @example An example of creating a {@link Source} that issues the timestamp of subscription. Here 17 * we effectively use `lazy` with the simple {@link fromValue | `fromValue`} source, to quickly 18 * create a Source that issues the time of its subscription, rather than the time of its creation 19 * that it would otherwise issue without `lazy`. 20 * 21 * ```ts 22 * lazy(() => fromValue(Date.now())); 23 * ``` 24 */ 25export function lazy<T>(produce: () => Source<T>): Source<T> { 26 return sink => produce()(sink); 27} 28 29/** Converts an AsyncIterable to a Source that pulls and issues values from it as requested. 30 * 31 * @see {@link fromIterable | `fromIterable`} for the non-async Iterable version of this helper, 32 * which calls this helper automatically as needed. 33 * 34 * @param iterable - An {@link AsyncIterable | `AsyncIterable`}. 35 * @returns A {@link Source} issuing values sourced from the Iterable. 36 * 37 * @remarks 38 * `fromAsyncIterable` will create a {@link Source} that pulls and issues values from a given 39 * {@link AsyncIterable}. This can be used in many interoperability situations, including to consume 40 * an async generator function. 41 * 42 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it 43 * using {@link AsyncIterator.throw}, which allows an async generator to recover from the exception. 44 * 45 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols} 46 * for the JS Iterable protocol. 47 */ 48export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> { 49 return sink => { 50 const iterator = iterable[Symbol.asyncIterator](); 51 let ended = false; 52 let looping = false; 53 let pulled = false; 54 let next: IteratorResult<T>; 55 sink( 56 start(async signal => { 57 if (signal === TalkbackKind.Close) { 58 ended = true; 59 if (iterator.return) iterator.return(); 60 } else if (looping) { 61 pulled = true; 62 } else { 63 for (pulled = looping = true; pulled && !ended; ) { 64 if ((next = await iterator.next()).done) { 65 ended = true; 66 if (iterator.return) await iterator.return(); 67 sink(SignalKind.End); 68 } else { 69 try { 70 pulled = false; 71 sink(push(next.value)); 72 } catch (error) { 73 if (iterator.throw) { 74 if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End); 75 } else { 76 throw error; 77 } 78 } 79 } 80 } 81 looping = false; 82 } 83 }) 84 ); 85 }; 86} 87 88/** Converts an Iterable to a Source that pulls and issues values from it as requested. 89 * @see {@link fromAsyncIterable | `fromAsyncIterable`} for the AsyncIterable version of this helper. 90 * @param iterable - An {@link Iterable | `Iterable`} or an `AsyncIterable` 91 * @returns A {@link Source} issuing values sourced from the Iterable. 92 * 93 * @remarks 94 * `fromIterable` will create a {@link Source} that pulls and issues values from a given 95 * {@link Iterable | JS Iterable}. As iterables are the common standard for any lazily iterated list 96 * of values in JS it can be applied to many different JS data types, including a JS Generator 97 * function. 98 * 99 * This Source will only call {@link Iterator.next} on the iterator when the subscribing {@link Sink} 100 * has pulled a new value with the {@link TalkbackKind.Pull | Pull signal}. `fromIterable` can 101 * therefore also be applied to "infinite" iterables, without a predefined end. 102 * 103 * This helper will call {@link fromAsyncIterable | `fromAsyncIterable`} automatically when the 104 * passed object also implements the async iterator protocol. 105 * 106 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it 107 * using {@link Iterator.throw}, which allows a generator to recover from the exception. 108 * 109 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol} 110 * for the JS Iterable protocol. 111 */ 112export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> { 113 if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>); 114 return sink => { 115 const iterator = iterable[Symbol.iterator](); 116 let ended = false; 117 let looping = false; 118 let pulled = false; 119 let next: IteratorResult<T>; 120 sink( 121 start(signal => { 122 if (signal === TalkbackKind.Close) { 123 ended = true; 124 if (iterator.return) iterator.return(); 125 } else if (looping) { 126 pulled = true; 127 } else { 128 for (pulled = looping = true; pulled && !ended; ) { 129 if ((next = iterator.next()).done) { 130 ended = true; 131 if (iterator.return) iterator.return(); 132 sink(SignalKind.End); 133 } else { 134 try { 135 pulled = false; 136 sink(push(next.value)); 137 } catch (error) { 138 if (iterator.throw) { 139 if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End); 140 } else { 141 throw error; 142 } 143 } 144 } 145 } 146 looping = false; 147 } 148 }) 149 ); 150 }; 151} 152 153/** Creates a Source that issues a each value of a given array synchronously. 154 * @see {@link fromIterable} which `fromArray` aliases. 155 * @param array - The array whose values will be issued one by one. 156 * @returns A {@link Source} issuing the array's values. 157 * 158 * @remarks 159 * `fromArray` will create a {@link Source} that issues the values of a given JS array one by one. It 160 * will issue values as they're pulled and is hence a "cold" source, not eagerly emitting values. It 161 * will end and issue the {@link SignalKind.End | End signal} when the array is exhausted of values. 162 * 163 * @example 164 * ```ts 165 * fromArray([1, 2, 3]); 166 * ``` 167 */ 168export const fromArray: <T>(array: T[]) => Source<T> = fromIterable; 169 170/** Creates a Source that issues a single value and ends immediately after. 171 * @param value - The value that will be issued. 172 * @returns A {@link Source} issuing the single value. 173 * 174 * @example 175 * ```ts 176 * fromValue('test'); 177 * ``` 178 */ 179export function fromValue<T>(value: T): Source<T> { 180 return sink => { 181 let ended = false; 182 sink( 183 start(signal => { 184 if (signal === TalkbackKind.Close) { 185 ended = true; 186 } else if (!ended) { 187 ended = true; 188 sink(push(value)); 189 sink(SignalKind.End); 190 } 191 }) 192 ); 193 }; 194} 195 196/** Creates a new Source from scratch from a passed `subscriber` function. 197 * @param subscriber - A callback that is called when the {@link Source} is subscribed to. 198 * @returns A {@link Source} created from the `subscriber` parameter. 199 * 200 * @remarks 201 * `make` is used to create a new, arbitrary {@link Source} from scratch. It calls the passed 202 * `subscriber` function when it's subscribed to. 203 * 204 * The `subscriber` function receives an {@link Observer}. You may call {@link Observer.next} to 205 * issue values on the Source, and {@link Observer.complete} to end the Source. 206 * 207 * Your `subscribr` function must return a {@link TeardownFn | teardown function} which is only 208 * called when your source is cancelled — not when you invoke `complete` yourself. As this creates a 209 * "cold" source, every time this source is subscribed to, it will invoke the `subscriber` function 210 * again and create a new source. 211 * 212 * @example 213 * 214 * ```ts 215 * make(observer => { 216 * const frame = requestAnimationFrame(() => { 217 * observer.next('animate!'); 218 * }); 219 * return () => { 220 * cancelAnimationFrame(frame); 221 * }; 222 * }); 223 * ``` 224 */ 225export function make<T>(subscriber: (observer: Observer<T>) => TeardownFn): Source<T> { 226 return sink => { 227 let ended = false; 228 const teardown = subscriber({ 229 next(value: T) { 230 if (!ended) sink(push(value)); 231 }, 232 complete() { 233 if (!ended) { 234 ended = true; 235 sink(SignalKind.End); 236 } 237 }, 238 }); 239 sink( 240 start(signal => { 241 if (signal === TalkbackKind.Close && !ended) { 242 ended = true; 243 teardown(); 244 } 245 }) 246 ); 247 }; 248} 249 250/** Creates a new Subject which can be used as an IO event hub. 251 * @returns A new {@link Subject}. 252 * 253 * @remarks 254 * `makeSubject` creates a new {@link Subject}. A Subject is a {@link Source} and an {@link Observer} 255 * combined in one interface, as the Observer is used to send new signals to the Source. This means 256 * that it's "hot" and hence all subscriptions to {@link Subject.source} share the same underlying 257 * signals coming from {@link Subject.next} and {@link Subject.complete}. 258 * 259 * @example 260 * ```ts 261 * const subject = makeSubject(); 262 * pipe(subject.source, subscribe(console.log)); 263 * // This will log the string on the above subscription 264 * subject.next('hello subject!'); 265 * ``` 266 */ 267export function makeSubject<T>(): Subject<T> { 268 let next: Subject<T>['next'] | void; 269 let complete: Subject<T>['complete'] | void; 270 return { 271 source: share( 272 make(observer => { 273 next = observer.next; 274 complete = observer.complete; 275 return teardownPlaceholder; 276 }) 277 ), 278 next(value: T) { 279 if (next) next(value); 280 }, 281 complete() { 282 if (complete) complete(); 283 }, 284 }; 285} 286 287/** A {@link Source} that immediately ends. 288 * @remarks 289 * `empty` is a {@link Source} that immediately issues an {@link SignalKind.End | End signal} when 290 * it's subscribed to, ending immediately. 291 * 292 * @see {@link never | `never`} for a source that instead never ends. 293 */ 294export const empty: Source<any> = (sink: Sink<any>): void => { 295 let ended = false; 296 sink( 297 start(signal => { 298 if (signal === TalkbackKind.Close) { 299 ended = true; 300 } else if (!ended) { 301 ended = true; 302 sink(SignalKind.End); 303 } 304 }) 305 ); 306}; 307 308/** A {@link Source} without values that never ends. 309 * @remarks 310 * `never` is a {@link Source} that never issues any signals and neither sends values nor ends. 311 * 312 * @see {@link empty | `empty`} for a source that instead ends immediately. 313 */ 314export const never: Source<any> = (sink: Sink<any>): void => { 315 sink(start(talkbackPlaceholder)); 316}; 317 318/** Creates a Source that issues an incrementing integer in intervals. 319 * @param ms - The interval in milliseconds. 320 * @returns A {@link Source} issuing an incrementing count on each interval. 321 * 322 * @remarks 323 * `interval` will create a {@link Source} that issues an incrementing counter each time the `ms` 324 * interval expires. 325 * 326 * It'll only stop when it's cancelled by a {@link TalkbackKind.Close | Close signal}. 327 * 328 * @example 329 * An example printing `0`, then `1`, and so on, in intervals of 50ms. 330 * 331 * ```ts 332 * pipe(interval(50), subscribe(console.log)); 333 * ``` 334 */ 335export function interval(ms: number): Source<number> { 336 return make(observer => { 337 let i = 0; 338 const id = setInterval(() => observer.next(i++), ms); 339 return () => clearInterval(id); 340 }); 341} 342 343/** Converts DOM Events to a Source given an `HTMLElement` and an event's name. 344 * @param element - The {@link HTMLElement} to listen to. 345 * @param event - The DOM Event name to listen to. 346 * @returns A {@link Source} issuing the {@link Event | DOM Events} as they're issued by the DOM. 347 * 348 * @remarks 349 * `fromDomEvent` will create a {@link Source} that listens to the given element's events and issues 350 * them as values on the source. This source will only stop when it's cancelled by a 351 * {@link TalkbackKind.Close | Close signal}. 352 * 353 * @example 354 * An example printing `'clicked!'` when the given `#root` element is clicked. 355 * 356 * ```ts 357 * const element = document.getElementById('root'); 358 * pipe( 359 * fromDomEvent(element, 'click'), 360 * subscribe(() => console.log('clicked!')) 361 * ); 362 * ``` 363 */ 364export function fromDomEvent(element: HTMLElement, event: string): Source<Event> { 365 return make(observer => { 366 element.addEventListener(event, observer.next); 367 return () => element.removeEventListener(event, observer.next); 368 }); 369} 370 371/** Converts a Promise to a Source that issues the resolving Promise's value and then ends. 372 * @param promise - The promise that will be wrapped. 373 * @returns A {@link Source} issuing the promise's value when it resolves. 374 * 375 * @remarks 376 * `fromPromise` will create a {@link Source} that issues the {@link Promise}'s resolving value 377 * asynchronously and ends immediately after resolving. 378 * 379 * This helper will not handle the promise's exceptions, and will cause uncaught errors if the 380 * promise rejects without a value. 381 * 382 * @example 383 * An example printing `'resolved!'` when the given promise resolves after a tick. 384 * 385 * ```ts 386 * pipe(fromPromise(Promise.resolve('resolved!')), subscribe(console.log)); 387 * ``` 388 */ 389export function fromPromise<T>(promise: Promise<T>): Source<T> { 390 return make(observer => { 391 promise.then(value => { 392 Promise.resolve(value).then(() => { 393 observer.next(value); 394 observer.complete(); 395 }); 396 }); 397 return teardownPlaceholder; 398 }); 399}