Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types'; 2import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers'; 3 4/** Creates a subscription to a given source and invokes a `subscriber` callback for each value. 5 * @param subscriber - A callback function called for each issued value. 6 * @returns A function accepting a {@link Source} and returning a {@link Subscription}. 7 * 8 * @remarks 9 * `subscribe` accepts a `subscriber` callback and returns a function accepting a {@link Source}. 10 * When a source is passed to the returned funtion, the subscription will start and `subscriber` 11 * will be called for each new value the Source issues. This will also return a {@link Subscription} 12 * object that can cancel the ongoing {@link Source} early. 13 * 14 * @example 15 * ```ts 16 * const subscription = pipe( 17 * fromValue('test'), 18 * subscribe(text => { 19 * console.log(text); // 'test' 20 * }) 21 * ); 22 * ``` 23 */ 24export function subscribe<T>(subscriber: (value: T) => void) { 25 return (source: Source<T>): Subscription => { 26 let talkback = talkbackPlaceholder; 27 let ended = false; 28 source(signal => { 29 if (signal === SignalKind.End) { 30 ended = true; 31 } else if (signal.tag === SignalKind.Start) { 32 (talkback = signal[0])(TalkbackKind.Pull); 33 } else if (!ended) { 34 subscriber(signal[0]); 35 talkback(TalkbackKind.Pull); 36 } 37 }); 38 return { 39 unsubscribe() { 40 if (!ended) { 41 ended = true; 42 talkback(TalkbackKind.Close); 43 } 44 }, 45 }; 46 }; 47} 48 49/** Creates a subscription to a given source and invokes a `subscriber` callback for each value. 50 * @see {@link subscribe} which this helper aliases without returnin a {@link Subscription}. 51 * @param subscriber - A callback function called for each issued value. 52 * @returns A function accepting a {@link Source}. 53 * 54 * @remarks 55 * `forEach` accepts a `subscriber` callback and returns a function accepting a {@link Source}. 56 * When a source is passed to the returned funtion, the subscription will start and `subscriber` 57 * will be called for each new value the Source issues. Unlike `subscribe` it will not return a 58 * Subscription object and can't be cancelled early. 59 * 60 * @example 61 * ```ts 62 * pipe( 63 * fromValue('test'), 64 * forEach(text => { 65 * console.log(text); // 'test' 66 * }) 67 * ); // undefined 68 * ``` 69 */ 70export function forEach<T>(subscriber: (value: T) => void) { 71 return (source: Source<T>): void => { 72 subscribe(subscriber)(source); 73 }; 74} 75 76/** Creates a subscription to a given source and invokes a `subscriber` callback for each value. 77 * @see {@link subscribe} which this helper aliases without accepting parameters or returning a 78 * {@link Subscription | Subscription}. 79 * 80 * @param source - A {@link Source}. 81 * 82 * @remarks 83 * `publish` accepts a {@link Source} and subscribes to it, starting its values. The resulting 84 * values cannot be observed and the subscription can't be cancelled, as this helper is purely 85 * intended to start side-effects. 86 * 87 * @example 88 * ```ts 89 * pipe( 90 * lazy(() => { 91 * console.log('test'); // this is called 92 * return fromValue(123); // this is never used 93 * }), 94 * publish 95 * ); // undefined 96 * ``` 97 */ 98export function publish<T>(source: Source<T>): void { 99 subscribe(_value => { 100 /*noop*/ 101 })(source); 102} 103 104const doneResult = { done: true } as IteratorReturnResult<void>; 105 106/** Converts a Source to an AsyncIterable that pulls and issues values from the Source. 107 * 108 * @param source - A {@link Source}. 109 * @returns An {@link AsyncIterable | `AsyncIterable`} issuing values from the Source. 110 * 111 * @remarks 112 * `toAsyncIterable` will create an {@link AsyncIterable} that pulls and issues values from a given 113 * {@link Source}. This can be used in many interoperability situations, to provide an iterable when 114 * a consumer requires it. 115 * 116 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols} 117 * for the JS Iterable protocol. 118 * 119 * @example 120 * ```ts 121 * const iterable = toAsyncIterable(fromArray([1, 2, 3])); 122 * for await (const value of iterable) { 123 * console.log(value); // outputs: 1, 2, 3 124 * } 125 * ``` 126 */ 127export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => { 128 const buffer: T[] = []; 129 130 let ended = false; 131 let started = false; 132 let pulled = false; 133 let talkback = talkbackPlaceholder; 134 let next: ((value: IteratorResult<T>) => void) | void; 135 136 return { 137 async next(): Promise<IteratorResult<T>> { 138 if (!started) { 139 started = true; 140 source(signal => { 141 if (ended) { 142 /*noop*/ 143 } else if (signal === SignalKind.End) { 144 if (next) next = next(doneResult); 145 ended = true; 146 } else if (signal.tag === SignalKind.Start) { 147 pulled = true; 148 (talkback = signal[0])(TalkbackKind.Pull); 149 } else { 150 pulled = false; 151 if (next) { 152 next = next({ value: signal[0], done: false }); 153 } else { 154 buffer.push(signal[0]); 155 } 156 } 157 }); 158 } 159 160 if (ended && !buffer.length) { 161 return doneResult; 162 } else if (!ended && !pulled && buffer.length <= 1) { 163 pulled = true; 164 talkback(TalkbackKind.Pull); 165 } 166 167 return buffer.length 168 ? { value: buffer.shift()!, done: false } 169 : new Promise(resolve => (next = resolve)); 170 }, 171 async return(): Promise<IteratorReturnResult<void>> { 172 if (!ended) next = talkback(TalkbackKind.Close); 173 ended = true; 174 return doneResult; 175 }, 176 [asyncIteratorSymbol()](): SourceIterable<T> { 177 return this; 178 }, 179 }; 180}; 181 182/** Subscribes to a given source and collects all synchronous values into an array. 183 * @param source - A {@link Source}. 184 * @returns An array of values collected from the {@link Source}. 185 * 186 * @remarks 187 * `toArray` accepts a {@link Source} and returns an array of all synchronously issued values from 188 * this Source. It will issue {@link TalkbackKind.Pull | Pull signals} after every value it receives 189 * and expects the Source to recursively issue values. 190 * 191 * Any asynchronously issued values will not be 192 * added to the array and a {@link TalkbackKind.Close | Close signal} is issued by the sink before 193 * returning the array. 194 * 195 * @example 196 * ```ts 197 * toArray(fromArray([1, 2, 3])); // [1, 2, 3] 198 * ``` 199 */ 200export function toArray<T>(source: Source<T>): T[] { 201 const values: T[] = []; 202 let talkback = talkbackPlaceholder; 203 let ended = false; 204 source(signal => { 205 if (signal === SignalKind.End) { 206 ended = true; 207 } else if (signal.tag === SignalKind.Start) { 208 (talkback = signal[0])(TalkbackKind.Pull); 209 } else { 210 values.push(signal[0]); 211 talkback(TalkbackKind.Pull); 212 } 213 }); 214 if (!ended) talkback(TalkbackKind.Close); 215 return values; 216} 217 218/** Subscribes to a given source and returns a Promise that will resolve with the last value the 219 * source issues. 220 * 221 * @param source - A {@link Source}. 222 * @returns A {@link Promise} resolving to the last value of the {@link Source}. 223 * 224 * @remarks 225 * `toPromise` will subscribe to the passed {@link Source} and resolve to the last value of it once 226 * it receives the last value, as signaled by the {@link SignalKind.End | End signal}. 227 * 228 * To keep its implementation simple, padding sources that don't issue any values to `toPromise` is 229 * undefined behaviour and `toPromise` will issue `undefined` in that case. 230 * 231 * The returned {@link Promise} delays its value by a microtick, using `Promise.resolve`. 232 * 233 * @example 234 * ```ts 235 * toPromise(fromValue('test')); // resolves: 'test' 236 * ``` 237 */ 238export function toPromise<T>(source: Source<T>): Promise<T> { 239 return new Promise(resolve => { 240 let talkback = talkbackPlaceholder; 241 let value: T | void; 242 source(signal => { 243 if (signal === SignalKind.End) { 244 Promise.resolve(value!).then(resolve); 245 } else if (signal.tag === SignalKind.Start) { 246 (talkback = signal[0])(TalkbackKind.Pull); 247 } else { 248 value = signal[0]; 249 talkback(TalkbackKind.Pull); 250 } 251 }); 252 }); 253}