1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
2import {
3 push,
4 start,
5 talkbackPlaceholder,
6 teardownPlaceholder,
7 asyncIteratorSymbol,
8} from './helpers';
9import { share } from './operators';
10
11/** Helper creating a Source from a factory function when it's subscribed to.
12 * @param produce - A factory function returning a {@link Source}.
13 * @returns A {@link Source} lazyily subscribing to the Source returned by the given factory
14 * function.
15 *
16 * @remarks
17 * At times it's necessary to create a {@link Source} lazily. The time of a {@link Source} being
18 * created could be different from when it's subscribed to, and hence we may want to split the
19 * creation and subscription time. This is especially useful when the Source we wrap is "hot" and
20 * issues values as soon as it's created, which we may then not receive in a subscriber.
21 *
22 * @example An example of creating a {@link Source} that issues the timestamp of subscription. Here
23 * we effectively use `lazy` with the simple {@link fromValue | `fromValue`} source, to quickly
24 * create a Source that issues the time of its subscription, rather than the time of its creation
25 * that it would otherwise issue without `lazy`.
26 *
27 * ```ts
28 * lazy(() => fromValue(Date.now()));
29 * ```
30 */
31export function lazy<T>(produce: () => Source<T>): Source<T> {
32 return sink => produce()(sink);
33}
34
35/** Converts an AsyncIterable to a Source that pulls and issues values from it as requested.
36 *
37 * @see {@link fromIterable | `fromIterable`} for the non-async Iterable version of this helper,
38 * which calls this helper automatically as needed.
39 *
40 * @param iterable - An {@link AsyncIterable | `AsyncIterable`}.
41 * @returns A {@link Source} issuing values sourced from the Iterable.
42 *
43 * @remarks
44 * `fromAsyncIterable` will create a {@link Source} that pulls and issues values from a given
45 * {@link AsyncIterable}. This can be used in many interoperability situations, including to consume
46 * an async generator function.
47 *
48 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
49 * using {@link AsyncIterator.throw}, which allows an async generator to recover from the exception.
50 *
51 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
52 * for the JS Iterable protocol.
53 */
54export function fromAsyncIterable<T>(iterable: AsyncIterable<T> | AsyncIterator<T>): Source<T> {
55 return sink => {
56 const iterator: AsyncIterator<T> =
57 (iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable;
58
59 let ended = false;
60 let looping = false;
61 let pulled = false;
62 let next: IteratorResult<T>;
63 sink(
64 start(async signal => {
65 if (signal === TalkbackKind.Close) {
66 ended = true;
67 if (iterator.return) iterator.return();
68 } else if (looping) {
69 pulled = true;
70 } else {
71 for (pulled = looping = true; pulled && !ended; ) {
72 if ((next = await iterator.next()).done) {
73 ended = true;
74 if (iterator.return) await iterator.return();
75 sink(SignalKind.End);
76 } else {
77 try {
78 pulled = false;
79 sink(push(next.value));
80 } catch (error) {
81 if (iterator.throw) {
82 if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End);
83 } else {
84 throw error;
85 }
86 }
87 }
88 }
89 looping = false;
90 }
91 })
92 );
93 };
94}
95
96/** Converts an Iterable to a Source that pulls and issues values from it as requested.
97 * @see {@link fromAsyncIterable | `fromAsyncIterable`} for the AsyncIterable version of this helper.
98 * @param iterable - An {@link Iterable | `Iterable`} or an `AsyncIterable`
99 * @returns A {@link Source} issuing values sourced from the Iterable.
100 *
101 * @remarks
102 * `fromIterable` will create a {@link Source} that pulls and issues values from a given
103 * {@link Iterable | JS Iterable}. As iterables are the common standard for any lazily iterated list
104 * of values in JS it can be applied to many different JS data types, including a JS Generator
105 * function.
106 *
107 * This Source will only call {@link Iterator.next} on the iterator when the subscribing {@link Sink}
108 * has pulled a new value with the {@link TalkbackKind.Pull | Pull signal}. `fromIterable` can
109 * therefore also be applied to "infinite" iterables, without a predefined end.
110 *
111 * This helper will call {@link fromAsyncIterable | `fromAsyncIterable`} automatically when the
112 * passed object also implements the async iterator protocol.
113 *
114 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
115 * using {@link Iterator.throw}, which allows a generator to recover from the exception.
116 *
117 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol}
118 * for the JS Iterable protocol.
119 */
120export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {
121 if (iterable[asyncIteratorSymbol()]) return fromAsyncIterable(iterable as AsyncIterable<T>);
122 return sink => {
123 const iterator = iterable[Symbol.iterator]();
124 let ended = false;
125 let looping = false;
126 let pulled = false;
127 let next: IteratorResult<T>;
128 sink(
129 start(signal => {
130 if (signal === TalkbackKind.Close) {
131 ended = true;
132 if (iterator.return) iterator.return();
133 } else if (looping) {
134 pulled = true;
135 } else {
136 for (pulled = looping = true; pulled && !ended; ) {
137 if ((next = iterator.next()).done) {
138 ended = true;
139 if (iterator.return) iterator.return();
140 sink(SignalKind.End);
141 } else {
142 try {
143 pulled = false;
144 sink(push(next.value));
145 } catch (error) {
146 if (iterator.throw) {
147 if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End);
148 } else {
149 throw error;
150 }
151 }
152 }
153 }
154 looping = false;
155 }
156 })
157 );
158 };
159}
160
161/** Creates a Source that issues a each value of a given array synchronously.
162 * @see {@link fromIterable} which `fromArray` aliases.
163 * @param array - The array whose values will be issued one by one.
164 * @returns A {@link Source} issuing the array's values.
165 *
166 * @remarks
167 * `fromArray` will create a {@link Source} that issues the values of a given JS array one by one. It
168 * will issue values as they're pulled and is hence a "cold" source, not eagerly emitting values. It
169 * will end and issue the {@link SignalKind.End | End signal} when the array is exhausted of values.
170 *
171 * @example
172 * ```ts
173 * fromArray([1, 2, 3]);
174 * ```
175 */
176export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
177
178/** Creates a Source that issues a single value and ends immediately after.
179 * @param value - The value that will be issued.
180 * @returns A {@link Source} issuing the single value.
181 *
182 * @example
183 * ```ts
184 * fromValue('test');
185 * ```
186 */
187export function fromValue<T>(value: T): Source<T> {
188 return sink => {
189 let ended = false;
190 sink(
191 start(signal => {
192 if (signal === TalkbackKind.Close) {
193 ended = true;
194 } else if (!ended) {
195 ended = true;
196 sink(push(value));
197 sink(SignalKind.End);
198 }
199 })
200 );
201 };
202}
203
204/** Creates a new Source from scratch from a passed `subscriber` function.
205 * @param subscriber - A callback that is called when the {@link Source} is subscribed to.
206 * @returns A {@link Source} created from the `subscriber` parameter.
207 *
208 * @remarks
209 * `make` is used to create a new, arbitrary {@link Source} from scratch. It calls the passed
210 * `subscriber` function when it's subscribed to.
211 *
212 * The `subscriber` function receives an {@link Observer}. You may call {@link Observer.next} to
213 * issue values on the Source, and {@link Observer.complete} to end the Source.
214 *
215 * Your `subscribr` function must return a {@link TeardownFn | teardown function} which is only
216 * called when your source is cancelled — not when you invoke `complete` yourself. As this creates a
217 * "cold" source, every time this source is subscribed to, it will invoke the `subscriber` function
218 * again and create a new source.
219 *
220 * @example
221 *
222 * ```ts
223 * make(observer => {
224 * const frame = requestAnimationFrame(() => {
225 * observer.next('animate!');
226 * });
227 * return () => {
228 * cancelAnimationFrame(frame);
229 * };
230 * });
231 * ```
232 */
233export function make<T>(subscriber: (observer: Observer<T>) => TeardownFn): Source<T> {
234 return sink => {
235 let ended = false;
236 const teardown = subscriber({
237 next(value: T) {
238 if (!ended) sink(push(value));
239 },
240 complete() {
241 if (!ended) {
242 ended = true;
243 sink(SignalKind.End);
244 }
245 },
246 });
247 sink(
248 start(signal => {
249 if (signal === TalkbackKind.Close && !ended) {
250 ended = true;
251 teardown();
252 }
253 })
254 );
255 };
256}
257
258/** Creates a new Subject which can be used as an IO event hub.
259 * @returns A new {@link Subject}.
260 *
261 * @remarks
262 * `makeSubject` creates a new {@link Subject}. A Subject is a {@link Source} and an {@link Observer}
263 * combined in one interface, as the Observer is used to send new signals to the Source. This means
264 * that it's "hot" and hence all subscriptions to {@link Subject.source} share the same underlying
265 * signals coming from {@link Subject.next} and {@link Subject.complete}.
266 *
267 * @example
268 * ```ts
269 * const subject = makeSubject();
270 * pipe(subject.source, subscribe(console.log));
271 * // This will log the string on the above subscription
272 * subject.next('hello subject!');
273 * ```
274 */
275export function makeSubject<T>(): Subject<T> {
276 let next: Subject<T>['next'] | void;
277 let complete: Subject<T>['complete'] | void;
278 return {
279 source: share(
280 make(observer => {
281 next = observer.next;
282 complete = observer.complete;
283 return teardownPlaceholder;
284 })
285 ),
286 next(value: T) {
287 if (next) next(value);
288 },
289 complete() {
290 if (complete) complete();
291 },
292 };
293}
294
295/** A {@link Source} that immediately ends.
296 * @remarks
297 * `empty` is a {@link Source} that immediately issues an {@link SignalKind.End | End signal} when
298 * it's subscribed to, ending immediately.
299 *
300 * @see {@link never | `never`} for a source that instead never ends.
301 */
302export const empty: Source<any> = (sink: Sink<any>): void => {
303 let ended = false;
304 sink(
305 start(signal => {
306 if (signal === TalkbackKind.Close) {
307 ended = true;
308 } else if (!ended) {
309 ended = true;
310 sink(SignalKind.End);
311 }
312 })
313 );
314};
315
316/** A {@link Source} without values that never ends.
317 * @remarks
318 * `never` is a {@link Source} that never issues any signals and neither sends values nor ends.
319 *
320 * @see {@link empty | `empty`} for a source that instead ends immediately.
321 */
322export const never: Source<any> = (sink: Sink<any>): void => {
323 sink(start(talkbackPlaceholder));
324};
325
326/** Creates a Source that issues an incrementing integer in intervals.
327 * @param ms - The interval in milliseconds.
328 * @returns A {@link Source} issuing an incrementing count on each interval.
329 *
330 * @remarks
331 * `interval` will create a {@link Source} that issues an incrementing counter each time the `ms`
332 * interval expires.
333 *
334 * It'll only stop when it's cancelled by a {@link TalkbackKind.Close | Close signal}.
335 *
336 * @example
337 * An example printing `0`, then `1`, and so on, in intervals of 50ms.
338 *
339 * ```ts
340 * pipe(interval(50), subscribe(console.log));
341 * ```
342 */
343export function interval(ms: number): Source<number> {
344 return make(observer => {
345 let i = 0;
346 const id = setInterval(() => observer.next(i++), ms);
347 return () => clearInterval(id);
348 });
349}
350
351/** Converts DOM Events to a Source given an `HTMLElement` and an event's name.
352 * @param element - The {@link HTMLElement} to listen to.
353 * @param event - The DOM Event name to listen to.
354 * @returns A {@link Source} issuing the {@link Event | DOM Events} as they're issued by the DOM.
355 *
356 * @remarks
357 * `fromDomEvent` will create a {@link Source} that listens to the given element's events and issues
358 * them as values on the source. This source will only stop when it's cancelled by a
359 * {@link TalkbackKind.Close | Close signal}.
360 *
361 * @example
362 * An example printing `'clicked!'` when the given `#root` element is clicked.
363 *
364 * ```ts
365 * const element = document.getElementById('root');
366 * pipe(
367 * fromDomEvent(element, 'click'),
368 * subscribe(() => console.log('clicked!'))
369 * );
370 * ```
371 */
372export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
373 return make(observer => {
374 element.addEventListener(event, observer.next);
375 return () => element.removeEventListener(event, observer.next);
376 });
377}
378
379/** Converts a Promise to a Source that issues the resolving Promise's value and then ends.
380 * @param promise - The promise that will be wrapped.
381 * @returns A {@link Source} issuing the promise's value when it resolves.
382 *
383 * @remarks
384 * `fromPromise` will create a {@link Source} that issues the {@link Promise}'s resolving value
385 * asynchronously and ends immediately after resolving.
386 *
387 * This helper will not handle the promise's exceptions, and will cause uncaught errors if the
388 * promise rejects without a value.
389 *
390 * @example
391 * An example printing `'resolved!'` when the given promise resolves after a tick.
392 *
393 * ```ts
394 * pipe(fromPromise(Promise.resolve('resolved!')), subscribe(console.log));
395 * ```
396 */
397export function fromPromise<T>(promise: Promise<T>): Source<T> {
398 return make(observer => {
399 promise.then(value => {
400 Promise.resolve(value).then(() => {
401 observer.next(value);
402 observer.complete();
403 });
404 });
405 return teardownPlaceholder;
406 });
407}