1import { Source, Subscription, TalkbackKind, SignalKind } from './types';
2import { talkbackPlaceholder } from './helpers';
3
4/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
5 * @param subscriber - A callback function called for each issued value.
6 * @returns A function accepting a {@link Source} and returning a {@link Subscription}.
7 *
8 * @remarks
9 * `subscribe` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
10 * When a source is passed to the returned funtion, the subscription will start and `subscriber`
11 * will be called for each new value the Source issues. This will also return a {@link Subscription}
12 * object that can cancel the ongoing {@link Source} early.
13 *
14 * @example
15 * ```ts
16 * const subscription = pipe(
17 * fromValue('test'),
18 * subscribe(text => {
19 * console.log(text); // 'test'
20 * })
21 * );
22 * ```
23 */
24export function subscribe<T>(subscriber: (value: T) => void) {
25 return (source: Source<T>): Subscription => {
26 let talkback = talkbackPlaceholder;
27 let ended = false;
28 source(signal => {
29 if (signal === SignalKind.End) {
30 ended = true;
31 } else if (signal.tag === SignalKind.Start) {
32 (talkback = signal[0])(TalkbackKind.Pull);
33 } else if (!ended) {
34 subscriber(signal[0]);
35 talkback(TalkbackKind.Pull);
36 }
37 });
38 return {
39 unsubscribe() {
40 if (!ended) {
41 ended = true;
42 talkback(TalkbackKind.Close);
43 }
44 },
45 };
46 };
47}
48
49/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
50 * @see {@link subscribe} which this helper aliases without returnin a {@link Subscription}.
51 * @param subscriber - A callback function called for each issued value.
52 * @returns A function accepting a {@link Source}.
53 *
54 * @remarks
55 * `forEach` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
56 * When a source is passed to the returned funtion, the subscription will start and `subscriber`
57 * will be called for each new value the Source issues. Unlike `subscribe` it will not return a
58 * Subscription object and can't be cancelled early.
59 *
60 * @example
61 * ```ts
62 * pipe(
63 * fromValue('test'),
64 * forEach(text => {
65 * console.log(text); // 'test'
66 * })
67 * ); // undefined
68 * ```
69 */
70export function forEach<T>(subscriber: (value: T) => void) {
71 return (source: Source<T>): void => {
72 subscribe(subscriber)(source);
73 };
74}
75
76/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
77 * @see {@link subscribe} which this helper aliases without accepting parameters or returning a
78 * {@link Subscription | Subscription}.
79 *
80 * @param source - A {@link Source}.
81 *
82 * @remarks
83 * `publish` accepts a {@link Source} and subscribes to it, starting its values. The resulting
84 * values cannot be observed and the subscription can't be cancelled, as this helper is purely
85 * intended to start side-effects.
86 *
87 * @example
88 * ```ts
89 * pipe(
90 * lazy(() => {
91 * console.log('test'); // this is called
92 * return fromValue(123); // this is never used
93 * }),
94 * publish
95 * ); // undefined
96 * ```
97 */
98export function publish<T>(source: Source<T>): void {
99 subscribe(_value => {
100 /*noop*/
101 })(source);
102}
103
104const doneResult = { done: true } as IteratorReturnResult<void>;
105
106/** Converts a Source to an AsyncIterable that pulls and issues values from the Source.
107 *
108 * @param source - A {@link Source}.
109 * @returns An {@link AsyncIterable | `AsyncIterable`} issuing values from the Source.
110 *
111 * @remarks
112 * `toAsyncIterable` will create an {@link AsyncIterable} that pulls and issues values from a given
113 * {@link Source}. This can be used in many interoperability situations, to provide an iterable when
114 * a consumer requires it.
115 *
116 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
117 * for the JS Iterable protocol.
118 *
119 * @example
120 * ```ts
121 * const iterable = toAsyncIterable(fromArray([1, 2, 3]));
122 * for await (const value of iterable) {
123 * console.log(value); // outputs: 1, 2, 3
124 * }
125 * ```
126 */
127export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
128 [Symbol.asyncIterator](): AsyncIterator<T> {
129 const buffer: T[] = [];
130
131 let ended = false;
132 let talkback = talkbackPlaceholder;
133 let next: ((value: IteratorResult<T>) => void) | void;
134
135 source(signal => {
136 if (ended) {
137 /*noop*/
138 } else if (signal === SignalKind.End) {
139 if (next) next = next(doneResult);
140 ended = true;
141 } else if (signal.tag === SignalKind.Start) {
142 (talkback = signal[0])(TalkbackKind.Pull);
143 } else if (next) {
144 next = next({ value: signal[0], done: false });
145 } else {
146 buffer.push(signal[0]);
147 }
148 });
149
150 return {
151 async next(): Promise<IteratorResult<T>> {
152 if (ended && !buffer.length) {
153 return doneResult;
154 } else if (!ended && buffer.length <= 1) {
155 talkback(TalkbackKind.Pull);
156 }
157
158 return buffer.length
159 ? { value: buffer.shift()!, done: false }
160 : new Promise(resolve => (next = resolve));
161 },
162 async return(): Promise<IteratorReturnResult<void>> {
163 if (!ended) next = talkback(TalkbackKind.Close);
164 ended = true;
165 return doneResult;
166 },
167 };
168 },
169});
170
171/** Subscribes to a given source and collects all synchronous values into an array.
172 * @param source - A {@link Source}.
173 * @returns An array of values collected from the {@link Source}.
174 *
175 * @remarks
176 * `toArray` accepts a {@link Source} and returns an array of all synchronously issued values from
177 * this Source. It will issue {@link TalkbackKind.Pull | Pull signals} after every value it receives
178 * and expects the Source to recursively issue values.
179 *
180 * Any asynchronously issued values will not be
181 * added to the array and a {@link TalkbackKind.Close | Close signal} is issued by the sink before
182 * returning the array.
183 *
184 * @example
185 * ```ts
186 * toArray(fromArray([1, 2, 3])); // [1, 2, 3]
187 * ```
188 */
189export function toArray<T>(source: Source<T>): T[] {
190 const values: T[] = [];
191 let talkback = talkbackPlaceholder;
192 let ended = false;
193 source(signal => {
194 if (signal === SignalKind.End) {
195 ended = true;
196 } else if (signal.tag === SignalKind.Start) {
197 (talkback = signal[0])(TalkbackKind.Pull);
198 } else {
199 values.push(signal[0]);
200 talkback(TalkbackKind.Pull);
201 }
202 });
203 if (!ended) talkback(TalkbackKind.Close);
204 return values;
205}
206
207/** Subscribes to a given source and returns a Promise that will resolve with the last value the
208 * source issues.
209 *
210 * @param source - A {@link Source}.
211 * @returns A {@link Promise} resolving to the last value of the {@link Source}.
212 *
213 * @remarks
214 * `toPromise` will subscribe to the passed {@link Source} and resolve to the last value of it once
215 * it receives the last value, as signaled by the {@link SignalKind.End | End signal}.
216 *
217 * To keep its implementation simple, padding sources that don't issue any values to `toPromise` is
218 * undefined behaviour and `toPromise` will issue `undefined` in that case.
219 *
220 * The returned {@link Promise} delays its value by a microtick, using `Promise.resolve`.
221 *
222 * @example
223 * ```ts
224 * toPromise(fromValue('test')); // resolves: 'test'
225 * ```
226 */
227export function toPromise<T>(source: Source<T>): Promise<T> {
228 return new Promise(resolve => {
229 let talkback = talkbackPlaceholder;
230 let value: T | void;
231 source(signal => {
232 if (signal === SignalKind.End) {
233 Promise.resolve(value!).then(resolve);
234 } else if (signal.tag === SignalKind.Start) {
235 (talkback = signal[0])(TalkbackKind.Pull);
236 } else {
237 value = signal[0];
238 talkback(TalkbackKind.Pull);
239 }
240 });
241 });
242}