1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
2import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
3import { share } from './operators';
4
5/** Helper creating a Source from a factory function when it's subscribed to.
6 * @param produce - A factory function returning a {@link Source}.
7 * @returns A {@link Source} lazyily subscribing to the Source returned by the given factory
8 * function.
9 *
10 * @remarks
11 * At times it's necessary to create a {@link Source} lazily. The time of a {@link Source} being
12 * created could be different from when it's subscribed to, and hence we may want to split the
13 * creation and subscription time. This is especially useful when the Source we wrap is "hot" and
14 * issues values as soon as it's created, which we may then not receive in a subscriber.
15 *
16 * @example An example of creating a {@link Source} that issues the timestamp of subscription. Here
17 * we effectively use `lazy` with the simple {@link fromValue | `fromValue`} source, to quickly
18 * create a Source that issues the time of its subscription, rather than the time of its creation
19 * that it would otherwise issue without `lazy`.
20 *
21 * ```ts
22 * lazy(() => fromValue(Date.now()));
23 * ```
24 */
25export function lazy<T>(produce: () => Source<T>): Source<T> {
26 return sink => produce()(sink);
27}
28
29/** Converts an AsyncIterable to a Source that pulls and issues values from it as requested.
30 *
31 * @see {@link fromIterable | `fromIterable`} for the non-async Iterable version of this helper,
32 * which calls this helper automatically as needed.
33 *
34 * @param iterable - An {@link AsyncIterable | `AsyncIterable`}.
35 * @returns A {@link Source} issuing values sourced from the Iterable.
36 *
37 * @remarks
38 * `fromAsyncIterable` will create a {@link Source} that pulls and issues values from a given
39 * {@link AsyncIterable}. This can be used in many interoperability situations, including to consume
40 * an async generator function.
41 *
42 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
43 * using {@link AsyncIterator.throw}, which allows an async generator to recover from the exception.
44 *
45 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
46 * for the JS Iterable protocol.
47 */
48export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
49 return sink => {
50 const iterator = iterable[Symbol.asyncIterator]();
51 let ended = false;
52 let looping = false;
53 let pulled = false;
54 let next: IteratorResult<T>;
55 sink(
56 start(async signal => {
57 if (signal === TalkbackKind.Close) {
58 ended = true;
59 if (iterator.return) iterator.return();
60 } else if (looping) {
61 pulled = true;
62 } else {
63 for (pulled = looping = true; pulled && !ended; ) {
64 if ((next = await iterator.next()).done) {
65 ended = true;
66 if (iterator.return) await iterator.return();
67 sink(SignalKind.End);
68 } else {
69 try {
70 pulled = false;
71 sink(push(next.value));
72 } catch (error) {
73 if (iterator.throw) {
74 if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End);
75 } else {
76 throw error;
77 }
78 }
79 }
80 }
81 looping = false;
82 }
83 })
84 );
85 };
86}
87
88/** Converts an Iterable to a Source that pulls and issues values from it as requested.
89 * @see {@link fromAsyncIterable | `fromAsyncIterable`} for the AsyncIterable version of this helper.
90 * @param iterable - An {@link Iterable | `Iterable`} or an `AsyncIterable`
91 * @returns A {@link Source} issuing values sourced from the Iterable.
92 *
93 * @remarks
94 * `fromIterable` will create a {@link Source} that pulls and issues values from a given
95 * {@link Iterable | JS Iterable}. As iterables are the common standard for any lazily iterated list
96 * of values in JS it can be applied to many different JS data types, including a JS Generator
97 * function.
98 *
99 * This Source will only call {@link Iterator.next} on the iterator when the subscribing {@link Sink}
100 * has pulled a new value with the {@link TalkbackKind.Pull | Pull signal}. `fromIterable` can
101 * therefore also be applied to "infinite" iterables, without a predefined end.
102 *
103 * This helper will call {@link fromAsyncIterable | `fromAsyncIterable`} automatically when the
104 * passed object also implements the async iterator protocol.
105 *
106 * When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
107 * using {@link Iterator.throw}, which allows a generator to recover from the exception.
108 *
109 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol}
110 * for the JS Iterable protocol.
111 */
112export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {
113 if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>);
114 return sink => {
115 const iterator = iterable[Symbol.iterator]();
116 let ended = false;
117 let looping = false;
118 let pulled = false;
119 let next: IteratorResult<T>;
120 sink(
121 start(signal => {
122 if (signal === TalkbackKind.Close) {
123 ended = true;
124 if (iterator.return) iterator.return();
125 } else if (looping) {
126 pulled = true;
127 } else {
128 for (pulled = looping = true; pulled && !ended; ) {
129 if ((next = iterator.next()).done) {
130 ended = true;
131 if (iterator.return) iterator.return();
132 sink(SignalKind.End);
133 } else {
134 try {
135 pulled = false;
136 sink(push(next.value));
137 } catch (error) {
138 if (iterator.throw) {
139 if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End);
140 } else {
141 throw error;
142 }
143 }
144 }
145 }
146 looping = false;
147 }
148 })
149 );
150 };
151}
152
153/** Creates a Source that issues a each value of a given array synchronously.
154 * @see {@link fromIterable} which `fromArray` aliases.
155 * @param array - The array whose values will be issued one by one.
156 * @returns A {@link Source} issuing the array's values.
157 *
158 * @remarks
159 * `fromArray` will create a {@link Source} that issues the values of a given JS array one by one. It
160 * will issue values as they're pulled and is hence a "cold" source, not eagerly emitting values. It
161 * will end and issue the {@link SignalKind.End | End signal} when the array is exhausted of values.
162 *
163 * @example
164 * ```ts
165 * fromArray([1, 2, 3]);
166 * ```
167 */
168export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
169
170/** Creates a Source that issues a single value and ends immediately after.
171 * @param value - The value that will be issued.
172 * @returns A {@link Source} issuing the single value.
173 *
174 * @example
175 * ```ts
176 * fromValue('test');
177 * ```
178 */
179export function fromValue<T>(value: T): Source<T> {
180 return sink => {
181 let ended = false;
182 sink(
183 start(signal => {
184 if (signal === TalkbackKind.Close) {
185 ended = true;
186 } else if (!ended) {
187 ended = true;
188 sink(push(value));
189 sink(SignalKind.End);
190 }
191 })
192 );
193 };
194}
195
196/** Creates a new Source from scratch from a passed `subscriber` function.
197 * @param subscriber - A callback that is called when the {@link Source} is subscribed to.
198 * @returns A {@link Source} created from the `subscriber` parameter.
199 *
200 * @remarks
201 * `make` is used to create a new, arbitrary {@link Source} from scratch. It calls the passed
202 * `subscriber` function when it's subscribed to.
203 *
204 * The `subscriber` function receives an {@link Observer}. You may call {@link Observer.next} to
205 * issue values on the Source, and {@link Observer.complete} to end the Source.
206 *
207 * Your `subscribr` function must return a {@link TeardownFn | teardown function} which is only
208 * called when your source is cancelled — not when you invoke `complete` yourself. As this creates a
209 * "cold" source, every time this source is subscribed to, it will invoke the `subscriber` function
210 * again and create a new source.
211 *
212 * @example
213 *
214 * ```ts
215 * make(observer => {
216 * const frame = requestAnimationFrame(() => {
217 * observer.next('animate!');
218 * });
219 * return () => {
220 * cancelAnimationFrame(frame);
221 * };
222 * });
223 * ```
224 */
225export function make<T>(subscriber: (observer: Observer<T>) => TeardownFn): Source<T> {
226 return sink => {
227 let ended = false;
228 const teardown = subscriber({
229 next(value: T) {
230 if (!ended) sink(push(value));
231 },
232 complete() {
233 if (!ended) {
234 ended = true;
235 sink(SignalKind.End);
236 }
237 },
238 });
239 sink(
240 start(signal => {
241 if (signal === TalkbackKind.Close && !ended) {
242 ended = true;
243 teardown();
244 }
245 })
246 );
247 };
248}
249
250/** Creates a new Subject which can be used as an IO event hub.
251 * @returns A new {@link Subject}.
252 *
253 * @remarks
254 * `makeSubject` creates a new {@link Subject}. A Subject is a {@link Source} and an {@link Observer}
255 * combined in one interface, as the Observer is used to send new signals to the Source. This means
256 * that it's "hot" and hence all subscriptions to {@link Subject.source} share the same underlying
257 * signals coming from {@link Subject.next} and {@link Subject.complete}.
258 *
259 * @example
260 * ```ts
261 * const subject = makeSubject();
262 * pipe(subject.source, subscribe(console.log));
263 * // This will log the string on the above subscription
264 * subject.next('hello subject!');
265 * ```
266 */
267export function makeSubject<T>(): Subject<T> {
268 let next: Subject<T>['next'] | void;
269 let complete: Subject<T>['complete'] | void;
270 return {
271 source: share(
272 make(observer => {
273 next = observer.next;
274 complete = observer.complete;
275 return teardownPlaceholder;
276 })
277 ),
278 next(value: T) {
279 if (next) next(value);
280 },
281 complete() {
282 if (complete) complete();
283 },
284 };
285}
286
287/** A {@link Source} that immediately ends.
288 * @remarks
289 * `empty` is a {@link Source} that immediately issues an {@link SignalKind.End | End signal} when
290 * it's subscribed to, ending immediately.
291 *
292 * @see {@link never | `never`} for a source that instead never ends.
293 */
294export const empty: Source<any> = (sink: Sink<any>): void => {
295 let ended = false;
296 sink(
297 start(signal => {
298 if (signal === TalkbackKind.Close) {
299 ended = true;
300 } else if (!ended) {
301 ended = true;
302 sink(SignalKind.End);
303 }
304 })
305 );
306};
307
308/** A {@link Source} without values that never ends.
309 * @remarks
310 * `never` is a {@link Source} that never issues any signals and neither sends values nor ends.
311 *
312 * @see {@link empty | `empty`} for a source that instead ends immediately.
313 */
314export const never: Source<any> = (sink: Sink<any>): void => {
315 sink(start(talkbackPlaceholder));
316};
317
318/** Creates a Source that issues an incrementing integer in intervals.
319 * @param ms - The interval in milliseconds.
320 * @returns A {@link Source} issuing an incrementing count on each interval.
321 *
322 * @remarks
323 * `interval` will create a {@link Source} that issues an incrementing counter each time the `ms`
324 * interval expires.
325 *
326 * It'll only stop when it's cancelled by a {@link TalkbackKind.Close | Close signal}.
327 *
328 * @example
329 * An example printing `0`, then `1`, and so on, in intervals of 50ms.
330 *
331 * ```ts
332 * pipe(interval(50), subscribe(console.log));
333 * ```
334 */
335export function interval(ms: number): Source<number> {
336 return make(observer => {
337 let i = 0;
338 const id = setInterval(() => observer.next(i++), ms);
339 return () => clearInterval(id);
340 });
341}
342
343/** Converts DOM Events to a Source given an `HTMLElement` and an event's name.
344 * @param element - The {@link HTMLElement} to listen to.
345 * @param event - The DOM Event name to listen to.
346 * @returns A {@link Source} issuing the {@link Event | DOM Events} as they're issued by the DOM.
347 *
348 * @remarks
349 * `fromDomEvent` will create a {@link Source} that listens to the given element's events and issues
350 * them as values on the source. This source will only stop when it's cancelled by a
351 * {@link TalkbackKind.Close | Close signal}.
352 *
353 * @example
354 * An example printing `'clicked!'` when the given `#root` element is clicked.
355 *
356 * ```ts
357 * const element = document.getElementById('root');
358 * pipe(
359 * fromDomEvent(element, 'click'),
360 * subscribe(() => console.log('clicked!'))
361 * );
362 * ```
363 */
364export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
365 return make(observer => {
366 element.addEventListener(event, observer.next);
367 return () => element.removeEventListener(event, observer.next);
368 });
369}
370
371/** Converts a Promise to a Source that issues the resolving Promise's value and then ends.
372 * @param promise - The promise that will be wrapped.
373 * @returns A {@link Source} issuing the promise's value when it resolves.
374 *
375 * @remarks
376 * `fromPromise` will create a {@link Source} that issues the {@link Promise}'s resolving value
377 * asynchronously and ends immediately after resolving.
378 *
379 * This helper will not handle the promise's exceptions, and will cause uncaught errors if the
380 * promise rejects without a value.
381 *
382 * @example
383 * An example printing `'resolved!'` when the given promise resolves after a tick.
384 *
385 * ```ts
386 * pipe(fromPromise(Promise.resolve('resolved!')), subscribe(console.log));
387 * ```
388 */
389export function fromPromise<T>(promise: Promise<T>): Source<T> {
390 return make(observer => {
391 promise.then(value => {
392 Promise.resolve(value).then(() => {
393 observer.next(value);
394 observer.complete();
395 });
396 });
397 return teardownPlaceholder;
398 });
399}