Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.3.2 7.9 kB view raw
1import { Source, Subscription, TalkbackKind, SignalKind } from './types'; 2import { talkbackPlaceholder } from './helpers'; 3 4/** Creates a subscription to a given source and invokes a `subscriber` callback for each value. 5 * @param subscriber - A callback function called for each issued value. 6 * @returns A function accepting a {@link Source} and returning a {@link Subscription}. 7 * 8 * @remarks 9 * `subscribe` accepts a `subscriber` callback and returns a function accepting a {@link Source}. 10 * When a source is passed to the returned funtion, the subscription will start and `subscriber` 11 * will be called for each new value the Source issues. This will also return a {@link Subscription} 12 * object that can cancel the ongoing {@link Source} early. 13 * 14 * @example 15 * ```ts 16 * const subscription = pipe( 17 * fromValue('test'), 18 * subscribe(text => { 19 * console.log(text); // 'test' 20 * }) 21 * ); 22 * ``` 23 */ 24export function subscribe<T>(subscriber: (value: T) => void) { 25 return (source: Source<T>): Subscription => { 26 let talkback = talkbackPlaceholder; 27 let ended = false; 28 source(signal => { 29 if (signal === SignalKind.End) { 30 ended = true; 31 } else if (signal.tag === SignalKind.Start) { 32 (talkback = signal[0])(TalkbackKind.Pull); 33 } else if (!ended) { 34 subscriber(signal[0]); 35 talkback(TalkbackKind.Pull); 36 } 37 }); 38 return { 39 unsubscribe() { 40 if (!ended) { 41 ended = true; 42 talkback(TalkbackKind.Close); 43 } 44 }, 45 }; 46 }; 47} 48 49/** Creates a subscription to a given source and invokes a `subscriber` callback for each value. 50 * @see {@link subscribe} which this helper aliases without returnin a {@link Subscription}. 51 * @param subscriber - A callback function called for each issued value. 52 * @returns A function accepting a {@link Source}. 53 * 54 * @remarks 55 * `forEach` accepts a `subscriber` callback and returns a function accepting a {@link Source}. 56 * When a source is passed to the returned funtion, the subscription will start and `subscriber` 57 * will be called for each new value the Source issues. Unlike `subscribe` it will not return a 58 * Subscription object and can't be cancelled early. 59 * 60 * @example 61 * ```ts 62 * pipe( 63 * fromValue('test'), 64 * forEach(text => { 65 * console.log(text); // 'test' 66 * }) 67 * ); // undefined 68 * ``` 69 */ 70export function forEach<T>(subscriber: (value: T) => void) { 71 return (source: Source<T>): void => { 72 subscribe(subscriber)(source); 73 }; 74} 75 76/** Creates a subscription to a given source and invokes a `subscriber` callback for each value. 77 * @see {@link subscribe} which this helper aliases without accepting parameters or returning a 78 * {@link Subscription | Subscription}. 79 * 80 * @param source - A {@link Source}. 81 * 82 * @remarks 83 * `publish` accepts a {@link Source} and subscribes to it, starting its values. The resulting 84 * values cannot be observed and the subscription can't be cancelled, as this helper is purely 85 * intended to start side-effects. 86 * 87 * @example 88 * ```ts 89 * pipe( 90 * lazy(() => { 91 * console.log('test'); // this is called 92 * return fromValue(123); // this is never used 93 * }), 94 * publish 95 * ); // undefined 96 * ``` 97 */ 98export function publish<T>(source: Source<T>): void { 99 subscribe(_value => { 100 /*noop*/ 101 })(source); 102} 103 104const doneResult = { done: true } as IteratorReturnResult<void>; 105 106/** Converts a Source to an AsyncIterable that pulls and issues values from the Source. 107 * 108 * @param source - A {@link Source}. 109 * @returns An {@link AsyncIterable | `AsyncIterable`} issuing values from the Source. 110 * 111 * @remarks 112 * `toAsyncIterable` will create an {@link AsyncIterable} that pulls and issues values from a given 113 * {@link Source}. This can be used in many interoperability situations, to provide an iterable when 114 * a consumer requires it. 115 * 116 * @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols} 117 * for the JS Iterable protocol. 118 * 119 * @example 120 * ```ts 121 * const iterable = toAsyncIterable(fromArray([1, 2, 3])); 122 * for await (const value of iterable) { 123 * console.log(value); // outputs: 1, 2, 3 124 * } 125 * ``` 126 */ 127export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({ 128 [Symbol.asyncIterator](): AsyncIterator<T> { 129 const buffer: T[] = []; 130 131 let ended = false; 132 let talkback = talkbackPlaceholder; 133 let next: ((value: IteratorResult<T>) => void) | void; 134 135 source(signal => { 136 if (ended) { 137 /*noop*/ 138 } else if (signal === SignalKind.End) { 139 if (next) next = next(doneResult); 140 ended = true; 141 } else if (signal.tag === SignalKind.Start) { 142 (talkback = signal[0])(TalkbackKind.Pull); 143 } else if (next) { 144 next = next({ value: signal[0], done: false }); 145 } else { 146 buffer.push(signal[0]); 147 } 148 }); 149 150 return { 151 async next(): Promise<IteratorResult<T>> { 152 if (ended && !buffer.length) { 153 return doneResult; 154 } else if (!ended && buffer.length <= 1) { 155 talkback(TalkbackKind.Pull); 156 } 157 158 return buffer.length 159 ? { value: buffer.shift()!, done: false } 160 : new Promise(resolve => (next = resolve)); 161 }, 162 async return(): Promise<IteratorReturnResult<void>> { 163 if (!ended) next = talkback(TalkbackKind.Close); 164 ended = true; 165 return doneResult; 166 }, 167 }; 168 }, 169}); 170 171/** Subscribes to a given source and collects all synchronous values into an array. 172 * @param source - A {@link Source}. 173 * @returns An array of values collected from the {@link Source}. 174 * 175 * @remarks 176 * `toArray` accepts a {@link Source} and returns an array of all synchronously issued values from 177 * this Source. It will issue {@link TalkbackKind.Pull | Pull signals} after every value it receives 178 * and expects the Source to recursively issue values. 179 * 180 * Any asynchronously issued values will not be 181 * added to the array and a {@link TalkbackKind.Close | Close signal} is issued by the sink before 182 * returning the array. 183 * 184 * @example 185 * ```ts 186 * toArray(fromArray([1, 2, 3])); // [1, 2, 3] 187 * ``` 188 */ 189export function toArray<T>(source: Source<T>): T[] { 190 const values: T[] = []; 191 let talkback = talkbackPlaceholder; 192 let ended = false; 193 source(signal => { 194 if (signal === SignalKind.End) { 195 ended = true; 196 } else if (signal.tag === SignalKind.Start) { 197 (talkback = signal[0])(TalkbackKind.Pull); 198 } else { 199 values.push(signal[0]); 200 talkback(TalkbackKind.Pull); 201 } 202 }); 203 if (!ended) talkback(TalkbackKind.Close); 204 return values; 205} 206 207/** Subscribes to a given source and returns a Promise that will resolve with the last value the 208 * source issues. 209 * 210 * @param source - A {@link Source}. 211 * @returns A {@link Promise} resolving to the last value of the {@link Source}. 212 * 213 * @remarks 214 * `toPromise` will subscribe to the passed {@link Source} and resolve to the last value of it once 215 * it receives the last value, as signaled by the {@link SignalKind.End | End signal}. 216 * 217 * To keep its implementation simple, padding sources that don't issue any values to `toPromise` is 218 * undefined behaviour and `toPromise` will issue `undefined` in that case. 219 * 220 * The returned {@link Promise} delays its value by a microtick, using `Promise.resolve`. 221 * 222 * @example 223 * ```ts 224 * toPromise(fromValue('test')); // resolves: 'test' 225 * ``` 226 */ 227export function toPromise<T>(source: Source<T>): Promise<T> { 228 return new Promise(resolve => { 229 let talkback = talkbackPlaceholder; 230 let value: T | void; 231 source(signal => { 232 if (signal === SignalKind.End) { 233 Promise.resolve(value!).then(resolve); 234 } else if (signal.tag === SignalKind.Start) { 235 (talkback = signal[0])(TalkbackKind.Pull); 236 } else { 237 value = signal[0]; 238 talkback(TalkbackKind.Pull); 239 } 240 }); 241 }); 242}