Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

fix: Improve compatibility of AsyncIterable helpers for polyfills (Babel/Hermes related) (#165)

+5
.changeset/yellow-hounds-heal.md
···
···
+
---
+
'wonka': patch
+
---
+
+
Improve compatibility of `fromAsyncIterable` and `toAsyncIterable`. The `toAsyncIterable` will now output an object that's both an `AsyncIterator` and an `AsyncIterable`. Both helpers will now use a polyfill for `Symbol.asyncIterator` to improve compatibility with the Hermes engine and Babel transpilation.
+10 -7
src/__tests__/sinks.test.ts
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
expect(pulls).toBe(1);
-
sink!(push(0));
-
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
-
expect(pulls).toBe(2);
sink!(push(1));
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
-
expect(pulls).toBe(3);
sink!(SignalKind.End);
expect(await asyncIterator.next()).toEqual({ done: true });
-
expect(pulls).toBe(3);
});
it('buffers actively pushed values', async () => {
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
sink!(push(0));
sink!(push(1));
sink!(SignalKind.End);
expect(pulls).toBe(1);
-
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
});
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
expect(pulls).toBe(1);
let resolved = false;
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
sink!(push(0));
-
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
expect(await asyncIterator.return!()).toEqual({ done: true });
sink!(push(1));
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
const next$ = asyncIterator.next();
+
sink!(push(0));
+
expect(await next$).toEqual({ value: 0, done: false });
expect(pulls).toBe(1);
sink!(push(1));
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
+
expect(pulls).toBe(2);
sink!(SignalKind.End);
expect(await asyncIterator.next()).toEqual({ done: true });
+
expect(pulls).toBe(2);
});
it('buffers actively pushed values', async () => {
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
const next$ = asyncIterator.next();
sink!(push(0));
sink!(push(1));
sink!(SignalKind.End);
expect(pulls).toBe(1);
+
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
expect(await asyncIterator.next()).toEqual({ done: true });
});
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
asyncIterator.next();
expect(pulls).toBe(1);
let resolved = false;
···
};
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
const next$ = asyncIterator.next();
sink!(push(0));
+
expect(await next$).toEqual({ value: 0, done: false });
expect(await asyncIterator.return!()).toEqual({ done: true });
sink!(push(1));
+27
src/helpers.ts
···
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';
/** Placeholder {@link TeardownFn | teardown functions} that's a no-op.
* @see {@link TeardownFn} for the definition and usage of teardowns.
* @internal
···
0: value,
} as Push<T>;
}
···
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';
+
declare global {
+
interface SymbolConstructor {
+
readonly observable: symbol;
+
}
+
}
+
/** Placeholder {@link TeardownFn | teardown functions} that's a no-op.
* @see {@link TeardownFn} for the definition and usage of teardowns.
* @internal
···
0: value,
} as Push<T>;
}
+
+
/** Returns the well-known symbol specifying the default AsyncIterator.
+
* @internal
+
*/
+
export const asyncIteratorSymbol = (): typeof Symbol.asyncIterator =>
+
(typeof Symbol === 'function' && Symbol.asyncIterator) || ('@@asyncIterator' as any);
+
+
/** Returns the well-known symbol specifying the default ES Observable.
+
* @privateRemarks
+
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
+
* that abides by the default Observable implementation must carry a method set to this well-known
+
* symbol that returns the Observable implementation. It's common for this object to be an
+
* Observable itself and return itself on this method.
+
*
+
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
+
* between Observable implementations.
+
*
+
* @internal
+
*/
+
export const observableSymbol = (): typeof Symbol.observable =>
+
(typeof Symbol === 'function' && Symbol.observable) || ('@@observable' as any);
+1 -22
src/observable.ts
···
import { Source, SignalKind, TalkbackKind } from './types';
-
import { push, start, talkbackPlaceholder } from './helpers';
-
-
declare global {
-
interface SymbolConstructor {
-
readonly observable: symbol;
-
}
-
}
/** A definition of the ES Observable Subscription type that is returned by
* {@link Observable.subscribe}
···
/** The well-known symbol specifying the default ES Observable for an object. */
[Symbol.observable](): Observable<T>;
}
-
-
/** Returns the well-known symbol specifying the default ES Observable.
-
* @privateRemarks
-
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
-
* that abides by the default Observable implementation must carry a method set to this well-known
-
* symbol that returns the Observable implementation. It's common for this object to be an
-
* Observable itself and return itself on this method.
-
*
-
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
-
* between Observable implementations.
-
*
-
* @internal
-
*/
-
const observableSymbol = (): typeof Symbol.observable =>
-
Symbol.observable || ('@@observable' as any);
/** Converts an ES Observable to a {@link Source}.
* @param input - The {@link ObservableLike} object that will be converted.
···
import { Source, SignalKind, TalkbackKind } from './types';
+
import { push, start, talkbackPlaceholder, observableSymbol } from './helpers';
/** A definition of the ES Observable Subscription type that is returned by
* {@link Observable.subscribe}
···
/** The well-known symbol specifying the default ES Observable for an object. */
[Symbol.observable](): Observable<T>;
}
/** Converts an ES Observable to a {@link Source}.
* @param input - The {@link ObservableLike} object that will be converted.
+51 -40
src/sinks.ts
···
-
import { Source, Subscription, TalkbackKind, SignalKind } from './types';
-
import { talkbackPlaceholder } from './helpers';
/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @param subscriber - A callback function called for each issued value.
···
* }
* ```
*/
-
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
-
[Symbol.asyncIterator](): AsyncIterator<T> {
-
const buffer: T[] = [];
-
let ended = false;
-
let talkback = talkbackPlaceholder;
-
let next: ((value: IteratorResult<T>) => void) | void;
-
source(signal => {
-
if (ended) {
-
/*noop*/
-
} else if (signal === SignalKind.End) {
-
if (next) next = next(doneResult);
-
ended = true;
-
} else if (signal.tag === SignalKind.Start) {
-
(talkback = signal[0])(TalkbackKind.Pull);
-
} else if (next) {
-
next = next({ value: signal[0], done: false });
-
} else {
-
buffer.push(signal[0]);
}
-
});
-
return {
-
async next(): Promise<IteratorResult<T>> {
-
if (ended && !buffer.length) {
-
return doneResult;
-
} else if (!ended && buffer.length <= 1) {
-
talkback(TalkbackKind.Pull);
-
}
-
return buffer.length
-
? { value: buffer.shift()!, done: false }
-
: new Promise(resolve => (next = resolve));
-
},
-
async return(): Promise<IteratorReturnResult<void>> {
-
if (!ended) next = talkback(TalkbackKind.Close);
-
ended = true;
-
return doneResult;
-
},
-
};
-
},
-
});
/** Subscribes to a given source and collects all synchronous values into an array.
* @param source - A {@link Source}.
···
+
import { Source, Subscription, TalkbackKind, SignalKind, SourceIterable } from './types';
+
import { talkbackPlaceholder, asyncIteratorSymbol } from './helpers';
/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
* @param subscriber - A callback function called for each issued value.
···
* }
* ```
*/
+
export const toAsyncIterable = <T>(source: Source<T>): SourceIterable<T> => {
+
const buffer: T[] = [];
+
let ended = false;
+
let started = false;
+
let pulled = false;
+
let talkback = talkbackPlaceholder;
+
let next: ((value: IteratorResult<T>) => void) | void;
+
return {
+
async next(): Promise<IteratorResult<T>> {
+
if (!started) {
+
started = true;
+
source(signal => {
+
if (ended) {
+
/*noop*/
+
} else if (signal === SignalKind.End) {
+
if (next) next = next(doneResult);
+
ended = true;
+
} else if (signal.tag === SignalKind.Start) {
+
pulled = true;
+
(talkback = signal[0])(TalkbackKind.Pull);
+
} else {
+
pulled = false;
+
if (next) {
+
next = next({ value: signal[0], done: false });
+
} else {
+
buffer.push(signal[0]);
+
}
+
}
+
});
}
+
if (ended && !buffer.length) {
+
return doneResult;
+
} else if (!ended && !pulled && buffer.length <= 1) {
+
pulled = true;
+
talkback(TalkbackKind.Pull);
+
}
+
return buffer.length
+
? { value: buffer.shift()!, done: false }
+
: new Promise(resolve => (next = resolve));
+
},
+
async return(): Promise<IteratorReturnResult<void>> {
+
if (!ended) next = talkback(TalkbackKind.Close);
+
ended = true;
+
return doneResult;
+
},
+
[asyncIteratorSymbol()](): SourceIterable<T> {
+
return this;
+
},
+
};
+
};
/** Subscribes to a given source and collects all synchronous values into an array.
* @param source - A {@link Source}.
+11 -3
src/sources.ts
···
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
-
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
import { share } from './operators';
/** Helper creating a Source from a factory function when it's subscribed to.
···
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
* for the JS Iterable protocol.
*/
-
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
return sink => {
-
const iterator = iterable[Symbol.asyncIterator]();
let ended = false;
let looping = false;
let pulled = false;
···
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
+
import {
+
push,
+
start,
+
talkbackPlaceholder,
+
teardownPlaceholder,
+
asyncIteratorSymbol,
+
} from './helpers';
import { share } from './operators';
/** Helper creating a Source from a factory function when it's subscribed to.
···
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
* for the JS Iterable protocol.
*/
+
export function fromAsyncIterable<T>(iterable: AsyncIterable<T> | AsyncIterator<T>): Source<T> {
return sink => {
+
const iterator: AsyncIterator<T> =
+
(iterable[asyncIteratorSymbol()] && iterable[asyncIteratorSymbol()]()) || iterable;
+
let ended = false;
let looping = false;
let pulled = false;
+5
src/types.d.ts
···
/** The {@link Source} that issues the signals as the {@link Observer} methods are called. */
source: Source<T>;
}
···
/** The {@link Source} that issues the signals as the {@link Observer} methods are called. */
source: Source<T>;
}
+
+
/** Async Iterable/Iterator after having converted a {@link Source}.
+
* @see {@link toAsyncIterable} for a helper that creates this structure.
+
*/
+
export interface SourceIterable<T> extends AsyncIterator<T>, AsyncIterable<T> {}