1import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types';
2import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers';
3
4/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
5 * @param subscriber - A callback function called for each issued value.
6 * @returns A function accepting a {@link Source} and returning a {@link Subscription}.
7 *
8 * @remarks
9 * `subscribe` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
10 * When a source is passed to the returned funtion, the subscription will start and `subscriber`
11 * will be called for each new value the Source issues. This will also return a {@link Subscription}
12 * object that can cancel the ongoing {@link Source} early.
13 *
14 * @example
15 * ```ts
16 * const subscription = pipe(
17 * fromValue('test'),
18 * subscribe(text => {
19 * console.log(text); // 'test'
20 * })
21 * );
22 * ```
23 */
24export function subscribe<T>(subscriber: (value: T) => void) {
25 return (source: Source<T>): Subscription => {
26 let talkback = talkbackPlaceholder;
27 let ended = false;
28 source(signal => {
29 if (signal === SignalKind.End) {
30 ended = true;
31 } else if (signal.tag === SignalKind.Start) {
32 (talkback = signal[0])(TalkbackKind.Pull);
33 } else if (!ended) {
34 subscriber(signal[0]);
35 talkback(TalkbackKind.Pull);
36 }
37 });
38 return {
39 unsubscribe() {
40 if (!ended) {
41 ended = true;
42 talkback(TalkbackKind.Close);
43 }
44 },
45 };
46 };
47}
48
49/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
50 * @see {@link subscribe} which this helper aliases without returnin a {@link Subscription}.
51 * @param subscriber - A callback function called for each issued value.
52 * @returns A function accepting a {@link Source}.
53 *
54 * @remarks
55 * `forEach` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
56 * When a source is passed to the returned funtion, the subscription will start and `subscriber`
57 * will be called for each new value the Source issues. Unlike `subscribe` it will not return a
58 * Subscription object and can't be cancelled early.
59 *
60 * @example
61 * ```ts
62 * pipe(
63 * fromValue('test'),
64 * forEach(text => {
65 * console.log(text); // 'test'
66 * })
67 * ); // undefined
68 * ```
69 */
70export function forEach<T>(subscriber: (value: T) => void) {
71 return (source: Source<T>): void => {
72 subscribe(subscriber)(source);
73 };
74}
75
76/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
77 * @see {@link subscribe} which this helper aliases without accepting parameters or returning a
78 * {@link Subscription | Subscription}.
79 *
80 * @param source - A {@link Source}.
81 *
82 * @remarks
83 * `publish` accepts a {@link Source} and subscribes to it, starting its values. The resulting
84 * values cannot be observed and the subscription can't be cancelled, as this helper is purely
85 * intended to start side-effects.
86 *
87 * @example
88 * ```ts
89 * pipe(
90 * lazy(() => {
91 * console.log('test'); // this is called
92 * return fromValue(123); // this is never used
93 * }),
94 * publish
95 * ); // undefined
96 * ```
97 */
98export function publish<T>(source: Source<T>): void {
99 subscribe(_value => {
100 /*noop*/
101 })(source);
102}
103
104const doneResult = { done: true } as IteratorReturnResult<void>;
105
106/** Converts a Source to an AsyncIterable that pulls and issues values from the Source.
107 *
108 * @param source - A {@link Source}.
109 * @returns An {@link AsyncIterable | `AsyncIterable`} issuing values from the Source.
110 *
111 * @remarks
112 * `toAsyncIterable` will create an {@link AsyncIterable} that pulls and issues values from a given
113 * {@link Source}. This can be used in many interoperability situations, to provide an iterable when
114 * a consumer requires it.
115 *
116 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
117 * for the JS Iterable protocol.
118 *
119 * @example
120 * ```ts
121 * const iterable = toAsyncIterable(fromArray([1, 2, 3]));
122 * for await (const value of iterable) {
123 * console.log(value); // outputs: 1, 2, 3
124 * }
125 * ```
126 */
127export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => {
128 const buffer: T[] = [];
129
130 let ended = false;
131 let started = false;
132 let pulled = false;
133 let talkback = talkbackPlaceholder;
134 let next: ((value: IteratorResult<T>) => void) | void;
135
136 return {
137 async next(): Promise<IteratorResult<T>> {
138 if (!started) {
139 started = true;
140 source(signal => {
141 if (ended) {
142 /*noop*/
143 } else if (signal === SignalKind.End) {
144 if (next) next = next(doneResult);
145 ended = true;
146 } else if (signal.tag === SignalKind.Start) {
147 pulled = true;
148 (talkback = signal[0])(TalkbackKind.Pull);
149 } else {
150 pulled = false;
151 if (next) {
152 next = next({ value: signal[0], done: false });
153 } else {
154 buffer.push(signal[0]);
155 }
156 }
157 });
158 }
159
160 if (ended && !buffer.length) {
161 return doneResult;
162 } else if (!ended && !pulled && buffer.length <= 1) {
163 pulled = true;
164 talkback(TalkbackKind.Pull);
165 }
166
167 return buffer.length
168 ? { value: buffer.shift()!, done: false }
169 : new Promise(resolve => (next = resolve));
170 },
171 async return(): Promise<IteratorReturnResult<void>> {
172 if (!ended) next = talkback(TalkbackKind.Close);
173 ended = true;
174 return doneResult;
175 },
176 [asyncIteratorSymbol()](): SourceIterable<T> {
177 return this;
178 },
179 };
180};
181
182/** Subscribes to a given source and collects all synchronous values into an array.
183 * @param source - A {@link Source}.
184 * @returns An array of values collected from the {@link Source}.
185 *
186 * @remarks
187 * `toArray` accepts a {@link Source} and returns an array of all synchronously issued values from
188 * this Source. It will issue {@link TalkbackKind.Pull | Pull signals} after every value it receives
189 * and expects the Source to recursively issue values.
190 *
191 * Any asynchronously issued values will not be
192 * added to the array and a {@link TalkbackKind.Close | Close signal} is issued by the sink before
193 * returning the array.
194 *
195 * @example
196 * ```ts
197 * toArray(fromArray([1, 2, 3])); // [1, 2, 3]
198 * ```
199 */
200export function toArray<T>(source: Source<T>): T[] {
201 const values: T[] = [];
202 let talkback = talkbackPlaceholder;
203 let ended = false;
204 source(signal => {
205 if (signal === SignalKind.End) {
206 ended = true;
207 } else if (signal.tag === SignalKind.Start) {
208 (talkback = signal[0])(TalkbackKind.Pull);
209 } else {
210 values.push(signal[0]);
211 talkback(TalkbackKind.Pull);
212 }
213 });
214 if (!ended) talkback(TalkbackKind.Close);
215 return values;
216}
217
218/** Subscribes to a given source and returns a Promise that will resolve with the last value the
219 * source issues.
220 *
221 * @param source - A {@link Source}.
222 * @returns A {@link Promise} resolving to the last value of the {@link Source}.
223 *
224 * @remarks
225 * `toPromise` will subscribe to the passed {@link Source} and resolve to the last value of it once
226 * it receives the last value, as signaled by the {@link SignalKind.End | End signal}.
227 *
228 * To keep its implementation simple, padding sources that don't issue any values to `toPromise` is
229 * undefined behaviour and `toPromise` will issue `undefined` in that case.
230 *
231 * The returned {@link Promise} delays its value by a microtick, using `Promise.resolve`.
232 *
233 * @example
234 * ```ts
235 * toPromise(fromValue('test')); // resolves: 'test'
236 * ```
237 */
238export function toPromise<T>(source: Source<T>): Promise<T> {
239 return new Promise(resolve => {
240 let talkback = talkbackPlaceholder;
241 let value: T | void;
242 source(signal => {
243 if (signal === SignalKind.End) {
244 Promise.resolve(value!).then(resolve);
245 } else if (signal.tag === SignalKind.Start) {
246 (talkback = signal[0])(TalkbackKind.Pull);
247 } else {
248 value = signal[0];
249 talkback(TalkbackKind.Pull);
250 }
251 });
252 });
253}