Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

feat: Add TSDocs to all internal and external utilities and types (#136)

+5
.changeset/violet-suits-yawn.md
···
···
+
---
+
'wonka': minor
+
---
+
+
Add extensive TSDoc documentation for all `wonka` internals and exports. This will replace the documentation and give consumers more guidance on each of the library's extensive utilities.
+3
.editorconfig
···
indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true
···
indent_style = space
insert_final_newline = true
trim_trailing_whitespace = true
+
+
[*.ts]
+
max_line_length = 100
+1
package.json
···
"eslint": "^8.29.0",
"eslint-config-prettier": "^8.5.0",
"eslint-plugin-prettier": "^4.2.1",
"flowgen": "^1.21.0",
"glob": "^8.0.3",
"husky-v4": "^4.3.8",
···
"eslint": "^8.29.0",
"eslint-config-prettier": "^8.5.0",
"eslint-plugin-prettier": "^4.2.1",
+
"eslint-plugin-tsdoc": "^0.2.17",
"flowgen": "^1.21.0",
"glob": "^8.0.3",
"husky-v4": "^4.3.8",
+33
pnpm-lock.yaml
···
eslint: ^8.29.0
eslint-config-prettier: ^8.5.0
eslint-plugin-prettier: ^4.2.1
flowgen: ^1.21.0
glob: ^8.0.3
husky-v4: ^4.3.8
···
eslint: 8.29.0
eslint-config-prettier: 8.5.0_eslint@8.29.0
eslint-plugin-prettier: 4.2.1_nrhoyyjffvfyk4vtlt5destxgm
flowgen: 1.21.0
glob: 8.0.3
husky-v4: 4.3.8
···
fs-extra: 8.1.0
globby: 11.1.0
read-yaml-file: 1.1.0
dev: true
/@nodelib/fs.scandir/2.1.5:
···
prettier-linter-helpers: 1.0.0
dev: true
/eslint-scope/5.1.1:
resolution: {integrity: sha512-2NxwbF/hZ0KpepYN0cNbo+FN6XoK7GaHlQhgx/hIZl6Va0bF45RQOOwhLIy8lQDbuCiadSLCBnH2CFYquit5bw==}
engines: {node: '>=8.0.0'}
···
/isexe/2.0.0:
resolution: {integrity: sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==}
dev: true
/js-sdsl/4.2.0:
···
/resolve-from/5.0.0:
resolution: {integrity: sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==}
engines: {node: '>=8'}
dev: true
/resolve/1.22.1:
···
eslint: ^8.29.0
eslint-config-prettier: ^8.5.0
eslint-plugin-prettier: ^4.2.1
+
eslint-plugin-tsdoc: ^0.2.17
flowgen: ^1.21.0
glob: ^8.0.3
husky-v4: ^4.3.8
···
eslint: 8.29.0
eslint-config-prettier: 8.5.0_eslint@8.29.0
eslint-plugin-prettier: 4.2.1_nrhoyyjffvfyk4vtlt5destxgm
+
eslint-plugin-tsdoc: 0.2.17
flowgen: 1.21.0
glob: 8.0.3
husky-v4: 4.3.8
···
fs-extra: 8.1.0
globby: 11.1.0
read-yaml-file: 1.1.0
+
dev: true
+
+
/@microsoft/tsdoc-config/0.16.2:
+
resolution: {integrity: sha512-OGiIzzoBLgWWR0UdRJX98oYO+XKGf7tiK4Zk6tQ/E4IJqGCe7dvkTvgDZV5cFJUzLGDOjeAXrnZoA6QkVySuxw==}
+
dependencies:
+
'@microsoft/tsdoc': 0.14.2
+
ajv: 6.12.6
+
jju: 1.4.0
+
resolve: 1.19.0
+
dev: true
+
+
/@microsoft/tsdoc/0.14.2:
+
resolution: {integrity: sha512-9b8mPpKrfeGRuhFH5iO1iwCLeIIsV6+H1sRfxbkoGXIyQE2BTsPd9zqSqQJ+pv5sJ/hT5M1zvOFL02MnEezFug==}
dev: true
/@nodelib/fs.scandir/2.1.5:
···
prettier-linter-helpers: 1.0.0
dev: true
+
/eslint-plugin-tsdoc/0.2.17:
+
resolution: {integrity: sha512-xRmVi7Zx44lOBuYqG8vzTXuL6IdGOeF9nHX17bjJ8+VE6fsxpdGem0/SBTmAwgYMKYB1WBkqRJVQ+n8GK041pA==}
+
dependencies:
+
'@microsoft/tsdoc': 0.14.2
+
'@microsoft/tsdoc-config': 0.16.2
+
dev: true
+
/eslint-scope/5.1.1:
resolution: {integrity: sha512-2NxwbF/hZ0KpepYN0cNbo+FN6XoK7GaHlQhgx/hIZl6Va0bF45RQOOwhLIy8lQDbuCiadSLCBnH2CFYquit5bw==}
engines: {node: '>=8.0.0'}
···
/isexe/2.0.0:
resolution: {integrity: sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==}
+
dev: true
+
+
/jju/1.4.0:
+
resolution: {integrity: sha512-8wb9Yw966OSxApiCt0K3yNJL8pnNeIv+OEq2YMidz4FKP6nonSRoOXc80iXY4JaN2FC11B9qsNmDsm+ZOfMROA==}
dev: true
/js-sdsl/4.2.0:
···
/resolve-from/5.0.0:
resolution: {integrity: sha512-qYg9KP24dD5qka9J47d0aVky0N+b4fTU89LN9iDnjB5waksiC49rvMB0PrUJQGoTmH50XPiqOvAjDfaijGxYZw==}
engines: {node: '>=8'}
+
dev: true
+
+
/resolve/1.19.0:
+
resolution: {integrity: sha512-rArEXAgsBG4UgRGcynxWIWKFvh/XZCcS8UJdHhwy91zwAvCZIbcs+vAbflgBnNjYMs/i/i+/Ux6IZhML1yPvxg==}
+
dependencies:
+
is-core-module: 2.11.0
+
path-parse: 1.0.7
dev: true
/resolve/1.22.1:
+2 -1
scripts/eslint-preset.js
···
},
},
extends: ['prettier'],
-
plugins: ['prettier'],
ignorePatterns: ['node_modules/', 'dist/', 'coverage/', 'perf/'],
rules: {
'sort-keys': 'off',
···
'@typescript-eslint/no-empty-function': 'off',
'@typescript-eslint/no-unused-vars': 'off',
'prefer-rest-params': 'off',
},
},
],
···
},
},
extends: ['prettier'],
+
plugins: ['prettier', 'tsdoc'],
ignorePatterns: ['node_modules/', 'dist/', 'coverage/', 'perf/'],
rules: {
'sort-keys': 'off',
···
'@typescript-eslint/no-empty-function': 'off',
'@typescript-eslint/no-unused-vars': 'off',
'prefer-rest-params': 'off',
+
'tsdoc/syntax': 'error',
},
},
],
+3 -1
scripts/flow-typings-plugin.mjs
···
// NOTE: Computed property names will be omitted
code = code.replace(/\[Symbol\.\w+\][?()]*:(?:.*);\n?/g, '');
-
let flowdef = compiler.compileDefinitionString(code);
flowdef = beautify(flowdef);
flowdef = flowdef.replace(/import/g, 'import type');
···
// NOTE: Computed property names will be omitted
code = code.replace(/\[Symbol\.\w+\][?()]*:(?:.*);\n?/g, '');
+
let flowdef = compiler.compileDefinitionString(code, {
+
jsdoc: false,
+
});
flowdef = beautify(flowdef);
flowdef = flowdef.replace(/import/g, 'import type');
+20
src/callbag.ts
···
import { Source, SignalKind } from './types';
import { push, start } from './helpers';
interface Callbag<I, O> {
(t: 0, d: Callbag<O, I>): void;
(t: 1, d: I): void;
(t: 2, d?: any): void;
}
export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> {
return sink => {
callbag(0, (signal: number, data: any) => {
···
};
}
export function toCallbag<T>(source: Source<T>): Callbag<any, T> {
return (signal: number, sink: any) => {
if (signal === 0) {
···
import { Source, SignalKind } from './types';
import { push, start } from './helpers';
+
/** A definition of the Callbag type as per its specification.
+
* @see {@link https://github.com/callbag/callbag} for the Callbag specification.
+
*/
interface Callbag<I, O> {
(t: 0, d: Callbag<O, I>): void;
(t: 1, d: I): void;
(t: 2, d?: any): void;
}
+
/** Converts a Callbag to a {@link Source}.
+
* @param callbag - The {@link Callbag} object that will be converted.
+
* @returns A {@link Source} wrapping the passed Callbag.
+
*
+
* @remarks
+
* This converts a Callbag to a {@link Source}. When this Source receives a {@link Sink} and
+
* the subscription starts, internally, it'll subscribe to the passed Callbag, passing through
+
* all of its emitted values.
+
*/
export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> {
return sink => {
callbag(0, (signal: number, data: any) => {
···
};
}
+
/** Converts a {@link Source} to a Callbag.
+
* @param source - The {@link Source} that will be converted.
+
* @returns A {@link Callbag} wrapping the passed Source.
+
*
+
* @remarks
+
* This converts a {@link Source} to a {@link Callbag}. When this Callbag is subscribed to, it
+
* internally subscribes to the Wonka Source and pulls new values.
+
*/
export function toCallbag<T>(source: Source<T>): Callbag<any, T> {
return (signal: number, sink: any) => {
if (signal === 0) {
+64 -9
src/combine.ts
···
? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>]
: [];
-
export function zip<Sources extends readonly [...Source<any>[]]>(
-
sources: [...Sources]
-
): Source<TypeOfSourceArray<Sources>>;
-
export function zip<Sources extends { [prop: string]: Source<any> }>(
-
sources: Sources
-
): Source<{ [Property in keyof Sources]: TypeOfSource<Sources[Property]> }>;
-
export function zip<T>(
-
sources: Source<T>[] | Record<string, Source<T>>
-
): Source<T[] | Record<string, T>> {
const size = Object.keys(sources).length;
return sink => {
const filled: Set<string | number> = new Set();
···
};
}
export function combine<Sources extends Source<any>[]>(
...sources: Sources
): Source<TypeOfSourceArray<Sources>> {
···
? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>]
: [];
+
/** Combines the latest values of several sources into a Source issuing either tuple or dictionary
+
* values.
+
*
+
* @param sources - Either an array or dictionary object of Sources.
+
* @returns A {@link Source} issuing a zipped value whenever any input Source updates.
+
*
+
* @remarks
+
* `zip` combines several {@link Source | Sources}. The resulting Source will issue its first value
+
* once all input Sources have at least issued one value, and will subsequently issue a new value
+
* each time any of the Sources emits a new value.
+
*
+
* Depending on whether an array or dictionary object of Sources is passed to `zip`, its emitted
+
* values will be arrays or dictionary objects of the Sources' values.
+
*
+
* @example
+
* An example of passing a dictionary object to `zip`. If an array is passed, the resulting
+
* values will output arrays of the sources' values instead.
+
*
+
* ```ts
+
* pipe(
+
* zip({
+
* x: fromValue(1),
+
* y: fromArray([2, 3]),
+
* }),
+
* subscribe(result => {
+
* // logs { x: 1, y: 2 } then { x: 1, y: 3 }
+
* console.log(result);
+
* })
+
* );
+
* ```
+
*/
+
interface zip {
+
<Sources extends readonly [...Source<any>[]]>(sources: [...Sources]): Source<
+
TypeOfSourceArray<Sources>
+
>;
+
<Sources extends { [prop: string]: Source<any> }>(sources: Sources): Source<{
+
[Property in keyof Sources]: TypeOfSource<Sources[Property]>;
+
}>;
+
}
+
function zip<T>(sources: Source<T>[] | Record<string, Source<T>>): Source<T[] | Record<string, T>> {
const size = Object.keys(sources).length;
return sink => {
const filled: Set<string | number> = new Set();
···
};
}
+
export { zip };
+
+
/** Combines the latest values of all passed sources into a Source issuing tuple values.
+
*
+
* @see {@link zip | `zip`} which this helper wraps and uses.
+
* @param sources - A variadic list of {@link Source} parameters.
+
* @returns A {@link Source} issuing a zipped value whenever any input Source updates.
+
*
+
* @remarks
+
* `combine` takes one or more {@link Source | Sources} as arguments. Once all input Sources have at
+
* least issued one value it will issue an array of all of the Sources' values. Subsequently, it
+
* will issue a new array value whenever any of the Sources update.
+
*
+
* @example
+
*
+
* ```ts
+
* pipe(
+
* combine(fromValue(1), fromValue(2)),
+
* subscribe(result => {
+
* console.log(result); // logs [1, 2]
+
* })
+
* );
+
* ```
+
*/
export function combine<Sources extends Source<any>[]>(
...sources: Sources
): Source<TypeOfSourceArray<Sources>> {
+21
src/helpers.ts
···
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';
export const teardownPlaceholder: TeardownFn = () => {
/*noop*/
};
export const talkbackPlaceholder: TalkbackFn = teardownPlaceholder;
export function start<T>(talkback: TalkbackFn): Start<T> {
const box: any = [talkback];
box.tag = SignalKind.Start;
return box;
}
export function push<T>(value: T): Push<T> {
const box: any = [value];
box.tag = SignalKind.Push;
···
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';
+
/** Placeholder {@link TeardownFn | teardown functions} that's a no-op.
+
* @see {@link TeardownFn} for the definition and usage of teardowns.
+
* @internal
+
*/
export const teardownPlaceholder: TeardownFn = () => {
/*noop*/
};
+
+
/** Placeholder {@link TalkbackFn | talkback function} that's a no-op.
+
* @privateRemarks
+
* This is frequently used in the codebase as a no-op initializer value for talkback functions in
+
* the implementation of {@link Operator | Operators}. This is cheaper than initializing the
+
* variables of talkbacks to `undefined` or `null` and performing an extra check before calling
+
* them. Since the {@link Start | Start signal} is assumed to come first and carry a talkback, we can
+
* use this to our advantage and use a no-op placeholder before {@link Start} is received.
+
*
+
* @internal
+
*/
export const talkbackPlaceholder: TalkbackFn = teardownPlaceholder;
+
/** Wraps the passed {@link TalkbackFn | talkback function} in a {@link Start | Start signal}.
+
* @internal
+
*/
export function start<T>(talkback: TalkbackFn): Start<T> {
const box: any = [talkback];
box.tag = SignalKind.Start;
return box;
}
+
/** Wraps the passed value in a {@link Push | Push signal}.
+
* @internal
+
*/
export function push<T>(value: T): Push<T> {
const box: any = [value];
box.tag = SignalKind.Push;
+12
src/index.ts
···
export * from './types';
export * from './sources';
export * from './operators';
···
+
/**
+
* A tiny but capable push & pull stream library for TypeScript and Flow.
+
*
+
* @remarks
+
* Wonka is a lightweight iterable and observable library and exposes a set of helpers to create
+
* streams, which are sources emitting multiple values, which allow you to create, transform, and
+
* consume event streams or iterable sets of data.
+
*
+
* It's loosely based on the Callbag spec: {@link https://github.com/callbag/callbag}
+
* @packageDocumentation
+
*/
+
export * from './types';
export * from './sources';
export * from './operators';
+121 -2
src/observable.ts
···
import { Source, SignalKind, TalkbackKind } from './types';
import { push, start, talkbackPlaceholder } from './helpers';
interface ObservableSubscription {
closed?: boolean;
unsubscribe(): void;
}
interface ObservableObserver<T> {
next(value: T): void;
error?(error: any): void;
complete?(): void;
}
interface ObservableLike<T> {
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
[Symbol.observable]?(): Observable<T>;
}
interface Observable<T> {
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
subscribe(
onNext: (value: T) => any,
onError?: (error: any) => any,
onComplete?: () => any
): ObservableSubscription;
[Symbol.observable](): Observable<T>;
}
const observableSymbol = (): typeof Symbol.observable => Symbol.observable || '@@observable';
export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
-
input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input;
return sink => {
-
const subscription = input.subscribe({
next(value: T) {
sink(push(value));
},
···
};
}
export function toObservable<T>(source: Source<T>): Observable<T> {
return {
subscribe(
···
import { Source, SignalKind, TalkbackKind } from './types';
import { push, start, talkbackPlaceholder } from './helpers';
+
/** A definition of the ES Observable Subscription type that is returned by
+
* {@link Observable.subscribe}
+
*
+
* @remarks
+
* The Subscription in ES Observables is a handle that is held while the Observable is actively
+
* streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed}
+
* whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the
+
* ongoing subscription and end the {@link Observable} early.
+
*
+
* @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
+
*/
interface ObservableSubscription {
+
/** A boolean flag indicating whether the subscription is closed.
+
* @remarks
+
* When `true`, the subscription will not issue new values to the {@link ObservableObserver} and
+
* has terminated. No new values are expected.
+
*
+
* @readonly
+
*/
closed?: boolean;
+
/** Cancels the subscription.
+
* @remarks
+
* This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will
+
* subsequently not be called at all. The subscription will be terminated and become inactive.
+
*/
unsubscribe(): void;
}
+
/** A definition of the ES Observable Observer type that is used to receive data from an
+
* {@link Observable}.
+
*
+
* @remarks
+
* The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from
+
* an {@link Observable} as it issues them.
+
*
+
* @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable
+
* specification of an Observer.
+
*/
interface ObservableObserver<T> {
+
/** Callback for the Observable issuing new values.
+
* @param value - The value that the {@link Observable} is sending.
+
*/
next(value: T): void;
+
/** Callback for the Observable encountering an error, terminating it.
+
* @param error - The error that the {@link Observable} has encountered.
+
*/
error?(error: any): void;
+
/** Callback for the Observable ending, after all values have been issued. */
complete?(): void;
}
+
/** A looser definition of ES Observable-like types that is used for interoperability.
+
* @remarks
+
* The Observable is often used by multiple libraries supporting or creating streams to provide
+
* interoperability for push-based streams. When converting from an Observable to a {@link Source},
+
* this looser type is accepted as an input.
+
*
+
* @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
+
* @see {@link Observable} for the full ES Observable type.
+
*/
interface ObservableLike<T> {
+
/**
+
* Subscribes to new signals from an {@link Observable} via callbacks.
+
* @param observer - An object containing callbacks for the various events of an Observable.
+
* @returns Subscription handle of type {@link ObservableSubscription}.
+
*
+
* @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
+
* issue events.
+
*/
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
+
+
/** The well-known symbol specifying the default ES Observable for an object. */
[Symbol.observable]?(): Observable<T>;
}
+
/** An ES Observable type that is a de-facto standard for push-based data sources across the JS
+
* ecosystem.
+
*
+
* @remarks
+
* The Observable is often used by multiple libraries supporting or creating streams to provide
+
* interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in
+
* functionality to Observables, it provides utilities to cleanly convert to and from Observables.
+
*
+
* @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
+
*/
interface Observable<T> {
+
/** Subscribes to new signals from an {@link Observable} via callbacks.
+
* @param observer - An object containing callbacks for the various events of an Observable.
+
* @returns Subscription handle of type {@link ObservableSubscription}.
+
*
+
* @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
+
* issue events.
+
*/
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
+
/** Subscribes to new signals from an {@link Observable} via callbacks.
+
* @param onNext - Callback for the Observable issuing new values.
+
* @param onError - Callback for the Observable encountering an error, terminating it.
+
* @param onComplete - Callback for the Observable ending, after all values have been issued.
+
* @returns Subscription handle of type {@link ObservableSubscription}.
+
*/
subscribe(
onNext: (value: T) => any,
onError?: (error: any) => any,
onComplete?: () => any
): ObservableSubscription;
+
/** The well-known symbol specifying the default ES Observable for an object. */
[Symbol.observable](): Observable<T>;
}
+
/** Returns the well-known symbol specifying the default ES Observable.
+
* @privateRemarks
+
* This symbol is used to mark an object as a default ES Observable. By the specification, an object
+
* that abides by the default Observable implementation must carry a method set to this well-known
+
* symbol that returns the Observable implementation. It's common for this object to be an
+
* Observable itself and return itself on this method.
+
*
+
* @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
+
* between Observable implementations.
+
*
+
* @internal
+
*/
const observableSymbol = (): typeof Symbol.observable => Symbol.observable || '@@observable';
+
/** Converts an ES Observable to a {@link Source}.
+
* @param input - The {@link ObservableLike} object that will be converted.
+
* @returns A {@link Source} wrapping the passed Observable.
+
*
+
* @remarks
+
* This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and
+
* the subscription starts, internally, it'll subscribe to the passed Observable, passing through
+
* all of the Observable's values. As such, this utility provides intercompatibility converting from
+
* standard Observables to Wonka Sources.
+
*
+
* @throws
+
* When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does
+
* not support or expect errors to be handled by streams.
+
*/
export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
return sink => {
+
const subscription = (
+
input[observableSymbol()] ? input[observableSymbol()]!() : input
+
).subscribe({
next(value: T) {
sink(push(value));
},
···
};
}
+
/** Converts a {@link Source} to an ES Observable.
+
* @param source - The {@link Source} that will be converted.
+
* @returns An {@link Observable} wrapping the passed Source.
+
*
+
* @remarks
+
* This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it
+
* internally subscribes to the Wonka Source and pulls new values. As such, this utility provides
+
* intercompatibility converting from Wonka Sources to standard ES Observables.
+
*/
export function toObservable<T>(source: Source<T>): Observable<T> {
return {
subscribe(
+599
src/operators.ts
···
const identity = <T>(x: T): T => x;
export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
return source => sink => {
let buffer: T[] = [];
···
};
}
export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
const inputQueue: In[] = [];
···
};
}
export function concatAll<T>(source: Source<Source<T>>): Source<T> {
return concatMap<Source<T>, T>(identity)(source);
}
export function concat<T>(sources: Source<T>[]): Source<T> {
return concatAll(fromArray(sources));
}
export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
return source => sink =>
source(signal => {
···
});
}
export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
let innerTalkbacks: TalkbackFn[] = [];
···
};
}
export function mergeAll<T>(source: Source<Source<T>>): Source<T> {
return mergeMap<Source<T>, T>(identity)(source);
}
export function merge<T>(sources: Source<T>[]): Source<T> {
return mergeAll(fromArray(sources));
}
export function onEnd<T>(callback: () => void): Operator<T, T> {
return source => sink => {
let ended = false;
···
};
}
export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
return source => sink => {
let ended = false;
···
};
}
export function onStart<T>(callback: () => void): Operator<T, T> {
return source => sink =>
source(signal => {
···
});
}
export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
···
};
}
export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
return source => sink => {
let acc = seed;
···
};
}
export function share<T>(source: Source<T>): Source<T> {
let sinks: Sink<T>[] = [];
let talkback = talkbackPlaceholder;
···
};
}
export function skip<T>(wait: number): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
···
};
}
export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
let outerTalkback = talkbackPlaceholder;
···
};
}
export function switchAll<T>(source: Source<Source<T>>): Source<T> {
return switchMap<Source<T>, T>(identity)(source);
}
export function take<T>(max: number): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
export function takeLast<T>(max: number): Operator<T, T> {
return source => sink => {
const queue: T[] = [];
···
};
}
export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
···
};
}
export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
return source => sink => {
let id: any | void;
···
};
}
export function delay<T>(wait: number): Operator<T, T> {
return source => sink => {
let active = 0;
···
};
}
export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
return source => sink => {
let skip = false;
···
const identity = <T>(x: T): T => x;
+
/** Buffers values and emits the array of bufferd values each time a `notifier` Source emits.
+
*
+
* @param notifier - A {@link Source} that releases the current buffer.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source
+
* emits, it will emit an array of all buffered values.
+
*
+
* This can be used to group values over time. A buffer will only be emitted when it contains any
+
* values.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* buffer(interval(100)),
+
* subscribe(x => {
+
* console.log(text); // logs: [0], [1, 2], [3, 4]...
+
* })
+
* );
+
* ```
+
*/
export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
return source => sink => {
let buffer: T[] = [];
···
};
}
+
/** Emits in order from the Sources returned by a mapping function per value of the Source.
+
*
+
* @param map - A function returning a {@link Source} per value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `concatMap` accepts a mapping function which must return a {@link Source} per value.
+
* The output {@link Source} will emit values from each Source the function returned, in order,
+
* queuing sources that aren't yet active.
+
*
+
* This can be used to issue multiple values per emission of an input {@link Source}, while keeping
+
* the order of their outputs consistent.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2]),
+
* concatMap(x => fromArray([x, x * 2])),
+
* subscribe(x => {
+
* console.log(text); // logs: 1, 2, 2, 4
+
* })
+
* );
+
* ```
+
*/
export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
const inputQueue: In[] = [];
···
};
}
+
/** Flattens a Source emitting Sources into a single Source emitting the inner values in order.
+
*
+
* @see {@link concatMap} which this helper uses and instead accept a mapping function.
+
* @param source - An {@link Source} emitting {@link Source | Sources}.
+
* @returns A {@link Source} emitting values from the inner Sources.
+
*
+
* @remarks
+
* `concatAll` accepts a {@link Source} emitting {@link Source | Sources}.
+
* The output {@link Source} will emit values from each Source, in order, queuing sources that
+
* aren't yet active.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([
+
* fromArray([1, 2]),
+
* fromArray([3, 4]),
+
* ]),
+
* concatAll,
+
* subscribe(x => {
+
* console.log(text); // logs: 1, 2, 3, 4
+
* })
+
* );
+
* ```
+
*/
export function concatAll<T>(source: Source<Source<T>>): Source<T> {
return concatMap<Source<T>, T>(identity)(source);
}
+
/** Emits values from the passed sources in order.
+
*
+
* @param sources - An array of {@link Source | Sources}.
+
* @returns A {@link Source} emitting values from the input Sources.
+
*
+
* @remarks
+
* `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting
+
* with the first one and continuing to the next only when the prior source ended.
+
*
+
* This can be used to issue combine sources while keeping the order of their values intact.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* concat([
+
* fromArray([1, 2]),
+
* fromArray([3, 4]),
+
* ]),
+
* subscribe(x => {
+
* console.log(text); // logs: 1, 2, 3, 4
+
* })
+
* );
+
* ```
+
*/
export function concat<T>(sources: Source<T>[]): Source<T> {
return concatAll(fromArray(sources));
}
+
/** Filters out emitted values for which the passed predicate function returns `false`.
+
*
+
* @param predicate - A function returning a boolean per value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `filter` will omit values from the {@link Source} for which the passed `predicate` function
+
* returns `false`.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* filter(x => x % 2 === 0),
+
* subscribe(x => {
+
* console.log(text); // logs: 2
+
* })
+
* );
+
* ```
+
*/
export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
+
/** Maps emitted values using the passed mapping function.
+
*
+
* @param map - A function returning transforming the {@link Source | Source's} values.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `map` accepts a transform function and calls it on each emitted value. It then emits
+
* the values returned by the transform function instead.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* map(x => x * 2),
+
* subscribe(x => {
+
* console.log(text); // logs: 2, 4, 6
+
* })
+
* );
+
* ```
+
*/
export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
return source => sink =>
source(signal => {
···
});
}
+
/** Emits from the Sources returned by a mapping function per value of the Source.
+
*
+
* @param map - A function returning a {@link Source} per value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `mergeMap` accepts a mapping function which must return a {@link Source} per value.
+
* The output {@link Source} will emit values from all {@link Source | Sources} the mapping function
+
* returned.
+
*
+
* This can be used to issue multiple values per emission of an input {@link Source}, essentially
+
* multiplexing all values to multiple Sources.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* mergeMap(x => pipe(
+
* fromValue(x),
+
* delay(100)
+
* )),
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 1, 2...
+
* })
+
* );
+
* ```
+
*/
export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
let innerTalkbacks: TalkbackFn[] = [];
···
};
}
+
/** Flattens a Source emitting Sources into a single Source emitting the inner values.
+
*
+
* @see {@link mergeMap} which this helper uses and instead accept a mapping function.
+
* @param source - An {@link Source} emitting {@link Source | Sources}.
+
* @returns A {@link Source} emitting values from the inner Sources.
+
*
+
* @remarks
+
* `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe
+
* to each incoming source immediately and start passing its emitted values through.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([
+
* interval(50),
+
* interval(100),
+
* ]),
+
* mergeAll,
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
+
* })
+
* );
+
* ```
+
*/
export function mergeAll<T>(source: Source<Source<T>>): Source<T> {
return mergeMap<Source<T>, T>(identity)(source);
}
+
/** Emits values from the passed sources simultaneously.
+
*
+
* @param sources - An array of {@link Source | Sources}.
+
* @returns A {@link Source} emitting values from the input Sources.
+
*
+
* @remarks
+
* `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing
+
* through all their emitted values simultaneously.
+
*
+
* This can be used to interleave the values of multiple sources.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* merge([
+
* interval(50),
+
* interval(100),
+
* ]),
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
+
* })
+
* );
+
* ```
+
*/
export function merge<T>(sources: Source<T>[]): Source<T> {
return mergeAll(fromArray(sources));
}
+
/** Calls the passed callback function when the Source ends or is closed.
+
*
+
* @param callback - A function that is called when the {@link Source} ends.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `onEnd` accepts a callback which is called when the {@link Source} either ends
+
* or is closed.
+
*
+
* This operator can be used to add side-effects to a Source.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* take(1),
+
* onEnd(() => {
+
* console.log('end');
+
* }),
+
* publish
+
* );
+
* ```
+
*/
export function onEnd<T>(callback: () => void): Operator<T, T> {
return source => sink => {
let ended = false;
···
};
}
+
/** Calls the passed callback function when the Source emits a value.
+
*
+
* @param callback - A function that is called with each value the {@link Source} emits.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `onPush` accepts a callback which is called for every emitted value of
+
* the {@link Source}.
+
*
+
* This operator can be used to add side-effects to a Source.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* onPush(value => {
+
* console.log(value); // logs: 1, 2, 3
+
* }),
+
* publish
+
* );
+
* ```
+
*/
export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
return source => sink => {
let ended = false;
···
};
}
+
/** Calls the passed callback function when the Source starts.
+
*
+
* @param callback - A function that is called when the {@link Source} is started.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `onPush` accepts a callback which is called for every emitted value of
+
* the {@link Source}.
+
*
+
* This operator can be used to add side-effects to a Source.
+
* Specifically, it's useful to add a side-effect for a Source that triggers only once
+
* the {@link Source} is used and started.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* onStart(() => {
+
* console.log('start');
+
* }),
+
* publish
+
* );
+
* ```
+
*/
export function onStart<T>(callback: () => void): Operator<T, T> {
return source => sink =>
source(signal => {
···
});
}
+
/** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value.
+
*
+
* @param notifier - A {@link Source} that triggers the last value to be emitted.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source
+
* emits, it will emit the latest value.
+
*
+
* This is a back pressure operator that can be used to omit values from a {@link Source} coming in
+
* too frequently.
+
*
+
* {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be
+
* ignored.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* sample(interval(100)),
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 2, 4...
+
* })
+
* );
+
* ```
+
*/
export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
···
};
}
+
/** Maps emitted values using the passed reducer function.
+
*
+
* @param reducer - A function called with the last value by the `reducer` and the emitted value.
+
* @param seed - The initial value that is passed to the `reducer`.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `scan` accepts a reducer function and a seed value. The reducer will be called initially with the
+
* seed value and the first emitted value. The {@link Source} will then emit the value returned by
+
* the reducer function. Subsequently, the `reducer` is called with the last value the `reducer`
+
* returned and the emitted value.
+
*
+
* This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits
+
* each value of the reducer.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* scan((acc, x) => acc + x, 0),
+
* subscribe(x => {
+
* console.log(text); // logs: 1, 3, 6
+
* })
+
* );
+
* ```
+
*/
export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
return source => sink => {
let acc = seed;
···
};
}
+
/** Shares one underlying subscription to the Source between all Sinks.
+
*
+
* @param source - A {@link Source} that should be shared.
+
* @returns A shared {@link Source}.
+
*
+
* @remarks
+
* `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it
+
* will share one subscription to the input source. This allows side-effects on the input
+
* {@link Source} to only be triggerd once.
+
*/
export function share<T>(source: Source<T>): Source<T> {
let sinks: Sink<T>[] = [];
let talkback = talkbackPlaceholder;
···
};
}
+
/** Omits `wait` amount of values from the Source and then runs as usual.
+
*
+
* @param wait - The number of values to be omitted.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards.
+
* This essentially skips a given number of values on the input {@link Source}.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* skip(2),
+
* subscribe(x => {
+
* console.log(text); // logs: 3
+
* })
+
* );
+
* ```
+
*/
export function skip<T>(wait: number): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
+
/** Omits values from an input Source until a notifier Source emits a value.
+
*
+
* @param notifier - A {@link Source} that starts the operator's sent values.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `skipUntil` will omit all values from the input {@link Source} until the `notifier`
+
* Source emits a value of its own. It'll then start passing values from the Source through.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* skipUntil(interval(150)),
+
* subscribe(x => {
+
* console.log(text); // logs: 2, 3...
+
* })
+
* );
+
* ```
+
*/
export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
···
};
}
+
/** Omits values from an input Source until a predicate function returns `false`.
+
*
+
* @param predicate - A function returning a boolean per value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `skipWhile` will omit all values from the input {@link Source} until the `predicate`
+
* function returns `false`. When the `predicate` function returns `false`, the Source's values will
+
* be passed through.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* skipWhile(x => x < 2),
+
* subscribe(x => {
+
* console.log(text); // logs: 2, 3
+
* })
+
* );
+
* ```
+
*/
export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
+
/** Emits from the latest Source returned by a mapping function per value of the Source.
+
*
+
* @param map - A function returning a {@link Source} per value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `switchMap` accepts a mapping function which must return a {@link Source} per value.
+
* The output {@link Source} will emit values from the latest Source the mapping function
+
* returned. If a value is emitted while the last returned Source is still active, the prior Source
+
* will be closed.
+
*
+
* This can be used to issue multiple values per emission of an input {@link Source}, while only
+
* letting one of these sub-Sources be active at a time.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(100),
+
* switchMap(() => interval(50)),
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 0, 0...
+
* })
+
* );
+
* ```
+
*/
export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
let outerTalkback = talkbackPlaceholder;
···
};
}
+
/** Flattens a Source emitting Sources into a single Source emitting the inner values.
+
*
+
* @see {@link switchMap} which this helper uses and instead accept a mapping function.
+
* @param source - An {@link Source} emitting {@link Source | Sources}.
+
* @returns A {@link Source} emitting values from the inner Sources.
+
*
+
* @remarks
+
* `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it
+
* receives a {@link Source} it will close its prior subscription and subscribe to the new Source
+
* instead, passing through its values.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(100),
+
* map(() => interval(50)),
+
* switchAll,
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 0, 0...
+
* })
+
* );
+
* ```
+
*/
export function switchAll<T>(source: Source<Source<T>>): Source<T> {
return switchMap<Source<T>, T>(identity)(source);
}
+
/** Emits `max` values from the Source and then ends.
+
*
+
* @param max - The maximum number of values emitted.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `take` will issue all values as normal until the `max` number of emitted values has been reached.
+
* It will then end and close the {@link Source}.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* take(2),
+
* subscribe(x => {
+
* console.log(text); // logs: 1, 2
+
* })
+
* );
+
* ```
+
*/
export function take<T>(max: number): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
+
/** Buffers the `max` last values of the Source and emits them once the Source ends.
+
*
+
* @param max - The maximum number of values buffered.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It
+
* will only emit values stored in the buffer once the {@link Source} ends.
+
*
+
* All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would
+
* synchronously.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* takeLast(1),
+
* subscribe(x => {
+
* console.log(text); // logs: 3
+
* })
+
* );
+
* ```
+
*/
export function takeLast<T>(max: number): Operator<T, T> {
return source => sink => {
const queue: T[] = [];
···
};
}
+
/** Takes values from an input Source until a notifier Source emits a value.
+
*
+
* @param notifier - A {@link Source} that stops the operator's sent values.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier`
+
* Source emits a value of its own. It'll then close the {@link Source}.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* takeUntil(interval(150)),
+
* subscribe(x => {
+
* console.log(text); // logs: 0, 1
+
* })
+
* );
+
* ```
+
*/
export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
···
};
}
+
/** Takes values from an input Source until a predicate function returns `false`.
+
*
+
* @param predicate - A function returning a boolean per value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate`
+
* function returns `false`. When the `predicate` function returns `false`, the current value is
+
* omitted and the {@link Source} is closed.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* takeWhile(x => x < 2),
+
* subscribe(x => {
+
* console.log(text); // logs: 1
+
* })
+
* );
+
* ```
+
*/
export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
return source => sink => {
let talkback = talkbackPlaceholder;
···
};
}
+
/** Debounces a Source by omitting values until a given timeframe has passed.
+
*
+
* @param timing - A function returning a debounce time (ms) per emitted value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted
+
* value. All emitted values issued by the {@link Source} during the returned time will be omitted
+
* until the time has passed.
+
*
+
* Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a
+
* value is let through.
+
*
+
* This is a back pressure operator that can be used to omit values from a {@link Source} coming in
+
* too frequently.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* debounce(() => 100),
+
* subscribe(x => {
+
* console.log(text); // never logs any value
+
* })
+
* );
+
* ```
+
*/
export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
return source => sink => {
let id: any | void;
···
};
}
+
/** Delays each signal emitted by a Source by given time (ms).
+
*
+
* @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by.
+
* This will create a timeout per received signal and delay the emitted values accordingly.
+
*
+
* Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to
+
* be ordered. Otherwise, signals will arrive in the wrong order at the sink.
+
*/
export function delay<T>(wait: number): Operator<T, T> {
return source => sink => {
let active = 0;
···
};
}
+
/** Throttles a Source by omitting values that are emitted before a given timeout.
+
*
+
* @param timing - A function returning a throttle time (ms) per emitted value.
+
* @returns An {@link Operator}.
+
*
+
* @remarks
+
* `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted
+
* value. During the returned timeframe all values issued by the {@link Source} will be omitted and
+
* dropped.
+
*
+
* This is a back pressure operator that can be used to omit values from a {@link Source} coming in
+
* too frequently.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* interval(50),
+
* throttle(() => 100),
+
* subscribe(x => {
+
* // omits every second value: 0, 2, 4...
+
* console.log(text);
+
* })
+
* );
+
* ```
+
*/
export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
return source => sink => {
let skip = false;
+155 -130
src/pipe.ts
···
-
import { Source } from './types';
interface UnaryFn<T, R> {
(source: T): R;
}
-
/* pipe definitions for source + operators composition */
-
function pipe<T, A>(source: Source<T>, op1: UnaryFn<Source<T>, Source<A>>): Source<A>;
-
function pipe<T, A, B>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>
-
): Source<B>;
-
function pipe<T, A, B, C>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>
-
): Source<C>;
-
function pipe<T, A, B, C, D>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>
-
): Source<D>;
-
function pipe<T, A, B, C, D, E>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>
-
): Source<E>;
-
function pipe<T, A, B, C, D, E, F>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
op6: UnaryFn<Source<E>, Source<F>>
-
): Source<F>;
-
function pipe<T, A, B, C, D, E, F, G>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
op6: UnaryFn<Source<E>, Source<F>>,
-
op7: UnaryFn<Source<F>, Source<G>>
-
): Source<G>;
-
function pipe<T, A, B, C, D, E, F, G, H>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
op6: UnaryFn<Source<E>, Source<F>>,
-
op7: UnaryFn<Source<F>, Source<G>>,
-
op8: UnaryFn<Source<G>, Source<H>>
-
): Source<H>;
-
/* pipe definitions for source + operators + consumer composition */
-
function pipe<T, R>(source: Source<T>, consumer: UnaryFn<Source<T>, R>): R;
-
function pipe<T, A, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
consumer: UnaryFn<Source<A>, R>
-
): R;
-
function pipe<T, A, B, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
consumer: UnaryFn<Source<B>, R>
-
): R;
-
function pipe<T, A, B, C, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
consumer: UnaryFn<Source<C>, R>
-
): R;
-
function pipe<T, A, B, C, D, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
consumer: UnaryFn<Source<D>, R>
-
): R;
-
function pipe<T, A, B, C, D, E, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
consumer: UnaryFn<Source<E>, R>
-
): R;
-
function pipe<T, A, B, C, D, E, F, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
op6: UnaryFn<Source<E>, Source<F>>,
-
consumer: UnaryFn<Source<F>, R>
-
): R;
-
function pipe<T, A, B, C, D, E, F, G, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
op6: UnaryFn<Source<E>, Source<F>>,
-
op7: UnaryFn<Source<F>, Source<G>>,
-
consumer: UnaryFn<Source<G>, R>
-
): R;
-
function pipe<T, A, B, C, D, E, F, G, H, R>(
-
source: Source<T>,
-
op1: UnaryFn<Source<T>, Source<A>>,
-
op2: UnaryFn<Source<A>, Source<B>>,
-
op3: UnaryFn<Source<B>, Source<C>>,
-
op4: UnaryFn<Source<C>, Source<D>>,
-
op5: UnaryFn<Source<D>, Source<E>>,
-
op6: UnaryFn<Source<E>, Source<F>>,
-
op7: UnaryFn<Source<F>, Source<G>>,
-
op8: UnaryFn<Source<G>, Source<H>>,
-
consumer: UnaryFn<Source<H>, R>
-
): R;
-
function pipe(...args: any[]) {
let x = args[0];
for (let i = 1, l = args.length; i < l; i++) x = args[i](x);
return x;
···
+
import { Source, Sink, Operator } from './types';
interface UnaryFn<T, R> {
(source: T): R;
}
+
/** Chain calls operators on a given source and returns the last result.
+
* @param args - A source, then a variable number of transform functions
+
*
+
* @remarks
+
* The `pipe` utility can be called with a {@link Source} then one or more unary transform functions.
+
* Each transform function will be called in turn with the last function's return value, starting
+
* with the source passed as the first argument to `pipe`.
+
*
+
* It's used to transform a source with a list of {@link Operator | Operators}. The last argument may
+
* also be a {@link Sink} that returns something else than a Source.
+
*
+
* @example
+
*
+
* ```ts
+
* pipe(
+
* fromArray([1, 2, 3]),
+
* map(x => x * 2),
+
* subscribe(console.log)
+
* );
+
* ```
+
*
+
* @see {@link https://github.com/tc39/proposal-pipeline-operator} for the JS Pipeline Operator spec, for which this is a replacement utility for.
+
*/
+
interface pipe {
+
/* pipe definitions for source + operators composition */
+
<T, A>(source: Source<T>, op1: UnaryFn<Source<T>, Source<A>>): Source<A>;
+
<T, A, B>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>
+
): Source<B>;
+
<T, A, B, C>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>
+
): Source<C>;
+
<T, A, B, C, D>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>
+
): Source<D>;
+
<T, A, B, C, D, E>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>
+
): Source<E>;
+
<T, A, B, C, D, E, F>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
op6: UnaryFn<Source<E>, Source<F>>
+
): Source<F>;
+
<T, A, B, C, D, E, F, G>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
op6: UnaryFn<Source<E>, Source<F>>,
+
op7: UnaryFn<Source<F>, Source<G>>
+
): Source<G>;
+
<T, A, B, C, D, E, F, G, H>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
op6: UnaryFn<Source<E>, Source<F>>,
+
op7: UnaryFn<Source<F>, Source<G>>,
+
op8: UnaryFn<Source<G>, Source<H>>
+
): Source<H>;
+
/* pipe definitions for source + operators + consumer composition */
+
<T, R>(source: Source<T>, consumer: UnaryFn<Source<T>, R>): R;
+
<T, A, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
consumer: UnaryFn<Source<A>, R>
+
): R;
+
<T, A, B, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
consumer: UnaryFn<Source<B>, R>
+
): R;
+
<T, A, B, C, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
consumer: UnaryFn<Source<C>, R>
+
): R;
+
<T, A, B, C, D, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
consumer: UnaryFn<Source<D>, R>
+
): R;
+
<T, A, B, C, D, E, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
consumer: UnaryFn<Source<E>, R>
+
): R;
+
<T, A, B, C, D, E, F, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
op6: UnaryFn<Source<E>, Source<F>>,
+
consumer: UnaryFn<Source<F>, R>
+
): R;
+
<T, A, B, C, D, E, F, G, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
op6: UnaryFn<Source<E>, Source<F>>,
+
op7: UnaryFn<Source<F>, Source<G>>,
+
consumer: UnaryFn<Source<G>, R>
+
): R;
+
<T, A, B, C, D, E, F, G, H, R>(
+
source: Source<T>,
+
op1: UnaryFn<Source<T>, Source<A>>,
+
op2: UnaryFn<Source<A>, Source<B>>,
+
op3: UnaryFn<Source<B>, Source<C>>,
+
op4: UnaryFn<Source<C>, Source<D>>,
+
op5: UnaryFn<Source<D>, Source<E>>,
+
op6: UnaryFn<Source<E>, Source<F>>,
+
op7: UnaryFn<Source<F>, Source<G>>,
+
op8: UnaryFn<Source<G>, Source<H>>,
+
consumer: UnaryFn<Source<H>, R>
+
): R;
+
}
+
function pipe(...args: Function[]): any {
let x = args[0];
for (let i = 1, l = args.length; i < l; i++) x = args[i](x);
return x;
+122
src/sinks.ts
···
import { Source, Subscription, TalkbackKind, SignalKind } from './types';
import { talkbackPlaceholder } from './helpers';
export function subscribe<T>(subscriber: (value: T) => void) {
return (source: Source<T>): Subscription => {
let talkback = talkbackPlaceholder;
···
};
}
export function forEach<T>(subscriber: (value: T) => void) {
return (source: Source<T>): void => {
subscribe(subscriber)(source);
};
}
export function publish<T>(source: Source<T>): void {
subscribe(_value => {
/*noop*/
···
const doneResult = { done: true } as IteratorReturnResult<void>;
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
[Symbol.asyncIterator](): AsyncIterator<T> {
const buffer: T[] = [];
···
},
});
export function toArray<T>(source: Source<T>): T[] {
const values: T[] = [];
let talkback = talkbackPlaceholder;
···
return values;
}
export function toPromise<T>(source: Source<T>): Promise<T> {
return new Promise(resolve => {
let talkback = talkbackPlaceholder;
···
import { Source, Subscription, TalkbackKind, SignalKind } from './types';
import { talkbackPlaceholder } from './helpers';
+
/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
+
* @param subscriber - A callback function called for each issued value.
+
* @returns A function accepting a {@link Source} and returning a {@link Subscription}.
+
*
+
* @remarks
+
* `subscribe` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
+
* When a source is passed to the returned funtion, the subscription will start and `subscriber`
+
* will be called for each new value the Source issues. This will also return a {@link Subscription}
+
* object that can cancel the ongoing {@link Source} early.
+
*
+
* @example
+
* ```ts
+
* const subscription = pipe(
+
* fromValue('test'),
+
* subscribe(text => {
+
* console.log(text); // 'test'
+
* })
+
* );
+
* ```
+
*/
export function subscribe<T>(subscriber: (value: T) => void) {
return (source: Source<T>): Subscription => {
let talkback = talkbackPlaceholder;
···
};
}
+
/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
+
* @see {@link subscribe} which this helper aliases without returnin a {@link Subscription}.
+
* @param subscriber - A callback function called for each issued value.
+
* @returns A function accepting a {@link Source}.
+
*
+
* @remarks
+
* `forEach` accepts a `subscriber` callback and returns a function accepting a {@link Source}.
+
* When a source is passed to the returned funtion, the subscription will start and `subscriber`
+
* will be called for each new value the Source issues. Unlike `subscribe` it will not return a
+
* Subscription object and can't be cancelled early.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* fromValue('test'),
+
* forEach(text => {
+
* console.log(text); // 'test'
+
* })
+
* ); // undefined
+
* ```
+
*/
export function forEach<T>(subscriber: (value: T) => void) {
return (source: Source<T>): void => {
subscribe(subscriber)(source);
};
}
+
/** Creates a subscription to a given source and invokes a `subscriber` callback for each value.
+
* @see {@link subscribe} which this helper aliases without accepting parameters or returning a
+
* {@link Subscription | Subscription}.
+
*
+
* @param source - A {@link Source}.
+
*
+
* @remarks
+
* `publish` accepts a {@link Source} and subscribes to it, starting its values. The resulting
+
* values cannot be observed and the subscription can't be cancelled, as this helper is purely
+
* intended to start side-effects.
+
*
+
* @example
+
* ```ts
+
* pipe(
+
* lazy(() => {
+
* console.log('test'); // this is called
+
* return fromValue(123); // this is never used
+
* }),
+
* publish
+
* ); // undefined
+
* ```
+
*/
export function publish<T>(source: Source<T>): void {
subscribe(_value => {
/*noop*/
···
const doneResult = { done: true } as IteratorReturnResult<void>;
+
/** Converts a Source to an AsyncIterable that pulls and issues values from the Source.
+
*
+
* @param source - A {@link Source}.
+
* @returns An {@link AsyncIterable | `AsyncIterable`} issuing values from the Source.
+
*
+
* @remarks
+
* `toAsyncIterable` will create an {@link AsyncIterable} that pulls and issues values from a given
+
* {@link Source}. This can be used in many interoperability situations, to provide an iterable when
+
* a consumer requires it.
+
*
+
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
+
* for the JS Iterable protocol.
+
*
+
* @example
+
* ```ts
+
* const iterable = toAsyncIterable(fromArray([1, 2, 3]));
+
* for await (const value of iterable) {
+
* console.log(value); // outputs: 1, 2, 3
+
* }
+
* ```
+
*/
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
[Symbol.asyncIterator](): AsyncIterator<T> {
const buffer: T[] = [];
···
},
});
+
/** Subscribes to a given source and collects all synchronous values into an array.
+
* @param source - A {@link Source}.
+
* @returns An array of values collected from the {@link Source}.
+
*
+
* @remarks
+
* `toArray` accepts a {@link Source} and returns an array of all synchronously issued values from
+
* this Source. It will issue {@link TalkbackKind.Pull | Pull signals} after every value it receives
+
* and expects the Source to recursively issue values.
+
*
+
* Any asynchronously issued values will not be
+
* added to the array and a {@link TalkbackKind.Close | Close signal} is issued by the sink before
+
* returning the array.
+
*
+
* @example
+
* ```ts
+
* toArray(fromArray([1, 2, 3])); // [1, 2, 3]
+
* ```
+
*/
export function toArray<T>(source: Source<T>): T[] {
const values: T[] = [];
let talkback = talkbackPlaceholder;
···
return values;
}
+
/** Subscribes to a given source and returns a Promise that will resolve with the last value the
+
* source issues.
+
*
+
* @param source - A {@link Source}.
+
* @returns A {@link Promise} resolving to the last value of the {@link Source}.
+
*
+
* @remarks
+
* `toPromise` will subscribe to the passed {@link Source} and resolve to the last value of it once
+
* it receives the last value, as signaled by the {@link SignalKind.End | End signal}.
+
*
+
* To keep its implementation simple, padding sources that don't issue any values to `toPromise` is
+
* undefined behaviour and `toPromise` will issue `undefined` in that case.
+
*
+
* The returned {@link Promise} delays its value by a microtick, using `Promise.resolve`.
+
*
+
* @example
+
* ```ts
+
* toPromise(fromValue('test')); // resolves: 'test'
+
* ```
+
*/
export function toPromise<T>(source: Source<T>): Promise<T> {
return new Promise(resolve => {
let talkback = talkbackPlaceholder;
+206 -4
src/sources.ts
···
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
import { share } from './operators';
-
export function lazy<T>(make: () => Source<T>): Source<T> {
-
return sink => make()(sink);
}
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
return sink => {
const iterator = iterable[Symbol.asyncIterator]();
···
};
}
export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {
if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>);
return sink => {
···
};
}
export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
export function fromValue<T>(value: T): Source<T> {
return sink => {
let ended = false;
···
};
}
-
export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> {
return sink => {
let ended = false;
-
const teardown = produce({
next(value: T) {
if (!ended) sink(push(value));
},
···
};
}
export function makeSubject<T>(): Subject<T> {
let next: Subject<T>['next'] | void;
let complete: Subject<T>['complete'] | void;
···
};
}
export const empty: Source<any> = (sink: Sink<any>): void => {
let ended = false;
sink(
···
);
};
export const never: Source<any> = (sink: Sink<any>): void => {
sink(start(talkbackPlaceholder));
};
export function interval(ms: number): Source<number> {
return make(observer => {
let i = 0;
···
});
}
export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
return make(observer => {
element.addEventListener(event, observer.next);
···
});
}
export function fromPromise<T>(promise: Promise<T>): Source<T> {
return make(observer => {
promise.then(value => {
···
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
import { share } from './operators';
+
/** Helper creating a Source from a factory function when it's subscribed to.
+
* @param produce - A factory function returning a {@link Source}.
+
* @returns A {@link Source} lazyily subscribing to the Source returned by the given factory
+
* function.
+
*
+
* @remarks
+
* At times it's necessary to create a {@link Source} lazily. The time of a {@link Source} being
+
* created could be different from when it's subscribed to, and hence we may want to split the
+
* creation and subscription time. This is especially useful when the Source we wrap is "hot" and
+
* issues values as soon as it's created, which we may then not receive in a subscriber.
+
*
+
* @example An example of creating a {@link Source} that issues the timestamp of subscription. Here
+
* we effectively use `lazy` with the simple {@link fromValue | `fromValue`} source, to quickly
+
* create a Source that issues the time of its subscription, rather than the time of its creation
+
* that it would otherwise issue without `lazy`.
+
*
+
* ```ts
+
* lazy(() => fromValue(Date.now()));
+
* ```
+
*/
+
export function lazy<T>(produce: () => Source<T>): Source<T> {
+
return sink => produce()(sink);
}
+
/** Converts an AsyncIterable to a Source that pulls and issues values from it as requested.
+
*
+
* @see {@link fromIterable | `fromIterable`} for the non-async Iterable version of this helper,
+
* which calls this helper automatically as needed.
+
*
+
* @param iterable - An {@link AsyncIterable | `AsyncIterable`}.
+
* @returns A {@link Source} issuing values sourced from the Iterable.
+
*
+
* @remarks
+
* `fromAsyncIterable` will create a {@link Source} that pulls and issues values from a given
+
* {@link AsyncIterable}. This can be used in many interoperability situations, including to consume
+
* an async generator function.
+
*
+
* When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
+
* using {@link AsyncIterator.throw}, which allows an async generator to recover from the exception.
+
*
+
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols}
+
* for the JS Iterable protocol.
+
*/
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
return sink => {
const iterator = iterable[Symbol.asyncIterator]();
···
};
}
+
/** Converts an Iterable to a Source that pulls and issues values from it as requested.
+
* @see {@link fromAsyncIterable | `fromAsyncIterable`} for the AsyncIterable version of this helper.
+
* @param iterable - An {@link Iterable | `Iterable`} or an `AsyncIterable`
+
* @returns A {@link Source} issuing values sourced from the Iterable.
+
*
+
* @remarks
+
* `fromIterable` will create a {@link Source} that pulls and issues values from a given
+
* {@link Iterable | JS Iterable}. As iterables are the common standard for any lazily iterated list
+
* of values in JS it can be applied to many different JS data types, including a JS Generator
+
* function.
+
*
+
* This Source will only call {@link Iterator.next} on the iterator when the subscribing {@link Sink}
+
* has pulled a new value with the {@link TalkbackKind.Pull | Pull signal}. `fromIterable` can
+
* therefore also be applied to "infinite" iterables, without a predefined end.
+
*
+
* This helper will call {@link fromAsyncIterable | `fromAsyncIterable`} automatically when the
+
* passed object also implements the async iterator protocol.
+
*
+
* When the {@link Sink} throws an exception when a new value is pushed, this helper will rethrow it
+
* using {@link Iterator.throw}, which allows a generator to recover from the exception.
+
*
+
* @see {@link https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_iterable_protocol}
+
* for the JS Iterable protocol.
+
*/
export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {
if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>);
return sink => {
···
};
}
+
/** Creates a Source that issues a each value of a given array synchronously.
+
* @see {@link fromIterable} which `fromArray` aliases.
+
* @param array - The array whose values will be issued one by one.
+
* @returns A {@link Source} issuing the array's values.
+
*
+
* @remarks
+
* `fromArray` will create a {@link Source} that issues the values of a given JS array one by one. It
+
* will issue values as they're pulled and is hence a "cold" source, not eagerly emitting values. It
+
* will end and issue the {@link SignalKind.End | End signal} when the array is exhausted of values.
+
*
+
* @example
+
* ```ts
+
* fromArray([1, 2, 3]);
+
* ```
+
*/
export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
+
/** Creates a Source that issues a single value and ends immediately after.
+
* @param value - The value that will be issued.
+
* @returns A {@link Source} issuing the single value.
+
*
+
* @example
+
* ```ts
+
* fromValue('test');
+
* ```
+
*/
export function fromValue<T>(value: T): Source<T> {
return sink => {
let ended = false;
···
};
}
+
/** Creates a new Source from scratch from a passed `subscriber` function.
+
* @param subscriber - A callback that is called when the {@link Source} is subscribed to.
+
* @returns A {@link Source} created from the `subscriber` parameter.
+
*
+
* @remarks
+
* `make` is used to create a new, arbitrary {@link Source} from scratch. It calls the passed
+
* `subscriber` function when it's subscribed to.
+
*
+
* The `subscriber` function receives an {@link Observer}. You may call {@link Observer.next} to
+
* issue values on the Source, and {@link Observer.complete} to end the Source.
+
*
+
* Your `subscribr` function must return a {@link TeardownFn | teardown function} which is only
+
* called when your source is cancelled — not when you invoke `complete` yourself. As this creates a
+
* "cold" source, every time this source is subscribed to, it will invoke the `subscriber` function
+
* again and create a new source.
+
*
+
* @example
+
*
+
* ```ts
+
* make(observer => {
+
* const frame = requestAnimationFrame(() => {
+
* observer.next('animate!');
+
* });
+
* return () => {
+
* cancelAnimationFrame(frame);
+
* };
+
* });
+
* ```
+
*/
+
export function make<T>(subscriber: (observer: Observer<T>) => TeardownFn): Source<T> {
return sink => {
let ended = false;
+
const teardown = subscriber({
next(value: T) {
if (!ended) sink(push(value));
},
···
};
}
+
/** Creates a new Subject which can be used as an IO event hub.
+
* @returns A new {@link Subject}.
+
*
+
* @remarks
+
* `makeSubject` creates a new {@link Subject}. A Subject is a {@link Source} and an {@link Observer}
+
* combined in one interface, as the Observer is used to send new signals to the Source. This means
+
* that it's "hot" and hence all subscriptions to {@link Subject.source} share the same underlying
+
* signals coming from {@link Subject.next} and {@link Subject.complete}.
+
*
+
* @example
+
* ```ts
+
* const subject = makeSubject();
+
* pipe(subject.source, subscribe(console.log));
+
* // This will log the string on the above subscription
+
* subject.next('hello subject!');
+
* ```
+
*/
export function makeSubject<T>(): Subject<T> {
let next: Subject<T>['next'] | void;
let complete: Subject<T>['complete'] | void;
···
};
}
+
/** A {@link Source} that immediately ends.
+
* @remarks
+
* `empty` is a {@link Source} that immediately issues an {@link SignalKind.End | End signal} when
+
* it's subscribed to, ending immediately.
+
*
+
* @see {@link never | `never`} for a source that instead never ends.
+
*/
export const empty: Source<any> = (sink: Sink<any>): void => {
let ended = false;
sink(
···
);
};
+
/** A {@link Source} without values that never ends.
+
* @remarks
+
* `never` is a {@link Source} that never issues any signals and neither sends values nor ends.
+
*
+
* @see {@link empty | `empty`} for a source that instead ends immediately.
+
*/
export const never: Source<any> = (sink: Sink<any>): void => {
sink(start(talkbackPlaceholder));
};
+
/** Creates a Source that issues an incrementing integer in intervals.
+
* @param ms - The interval in milliseconds.
+
* @returns A {@link Source} issuing an incrementing count on each interval.
+
*
+
* @remarks
+
* `interval` will create a {@link Source} that issues an incrementing counter each time the `ms`
+
* interval expires.
+
*
+
* It'll only stop when it's cancelled by a {@link TalkbackKind.Close | Close signal}.
+
*
+
* @example
+
* An example printing `0`, then `1`, and so on, in intervals of 50ms.
+
*
+
* ```ts
+
* pipe(interval(50), subscribe(console.log));
+
* ```
+
*/
export function interval(ms: number): Source<number> {
return make(observer => {
let i = 0;
···
});
}
+
/** Converts DOM Events to a Source given an `HTMLElement` and an event's name.
+
* @param element - The {@link HTMLElement} to listen to.
+
* @param event - The DOM Event name to listen to.
+
* @returns A {@link Source} issuing the {@link Event | DOM Events} as they're issued by the DOM.
+
*
+
* @remarks
+
* `fromDomEvent` will create a {@link Source} that listens to the given element's events and issues
+
* them as values on the source. This source will only stop when it's cancelled by a
+
* {@link TalkbackKind.Close | Close signal}.
+
*
+
* @example
+
* An example printing `'clicked!'` when the given `#root` element is clicked.
+
*
+
* ```ts
+
* const element = document.getElementById('root');
+
* pipe(
+
* fromDomEvent(element, 'click'),
+
* subscribe(() => console.log('clicked!'))
+
* );
+
* ```
+
*/
export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
return make(observer => {
element.addEventListener(event, observer.next);
···
});
}
+
/** Converts a Promise to a Source that issues the resolving Promise's value and then ends.
+
* @param promise - The promise that will be wrapped.
+
* @returns A {@link Source} issuing the promise's value when it resolves.
+
*
+
* @remarks
+
* `fromPromise` will create a {@link Source} that issues the {@link Promise}'s resolving value
+
* asynchronously and ends immediately after resolving.
+
*
+
* This helper will not handle the promise's exceptions, and will cause uncaught errors if the
+
* promise rejects without a value.
+
*
+
* @example
+
* An example printing `'resolved!'` when the given promise resolves after a tick.
+
*
+
* ```ts
+
* pipe(fromPromise(Promise.resolve('resolved!')), subscribe(console.log));
+
* ```
+
*/
export function fromPromise<T>(promise: Promise<T>): Source<T> {
return make(observer => {
promise.then(value => {
+161 -9
src/types.ts
···
-
/** A talkback signal is used to tell a [Source] that either the [Sink] is ready for new values or that the stream should be cancelled */
export const enum TalkbackKind {
Pull = 0,
Close = 1,
}
-
/** A talkback callback is sent to the sink with the [Start] signal to communicate signals back to the source. */
export type TalkbackFn = (signal: TalkbackKind) => void;
export type TeardownFn = () => void;
export const enum SignalKind {
Start = 0,
Push = 1,
End = 0,
}
export interface Tag<T> {
tag: T;
}
-
/** The start [Signal] is the first signal and carries a callback (talkback) so the sink can send signals to the source */
export type Start<_T> = Tag<SignalKind.Start> & [TalkbackFn];
-
/** The Push [Signal] carries new values to the sink, like in an event emitter */
export type Push<T> = Tag<SignalKind.Push> & [T];
-
/** A signal that communicates new events to a sink. */
export type Signal<T> = Start<T> | Push<T> | SignalKind.End;
-
/** A sink accepts new values from a [Source], like [Push], [Start], and an end signal. The [Start] is used to receive a callback to send talkback signals back to the source. */
export type Sink<T> = (signal: Signal<T>) => void;
-
/** A source is a function that accepts a [Sink] and then starts sending [Signal]s to it. */
export type Source<T> = (sink: Sink<T>) => void;
-
/** An operator transforms a [Source] and returns a new [Source], potentially with different timings or output types. */
export type Operator<In, Out> = (a: Source<In>) => Source<Out>;
-
/** Extracts the type of a given Source */
export type TypeOfSource<T> = T extends Source<infer U> ? U : never;
export interface Subscription {
unsubscribe(): void;
}
export interface Observer<T> {
next(value: T): void;
complete(): void;
}
export interface Subject<T> extends Observer<T> {
source: Source<T>;
}
···
+
/**
+
* Talkback signal that sends instructions from a sink to a source.
+
*
+
* @remarks
+
* This signal is issued via {@link TalkbackFn | talkback functions} that a {@link Sink} receives via
+
* the {@link Start} signal, to tell a {@link Source} to either send a new value (pulling) or stop
+
* sending values altogether (cancellation).
+
*/
export const enum TalkbackKind {
+
/** Instructs the {@link Source} to send the next value. */
Pull = 0,
+
/** Instructs the {@link Source} to stop sending values and cancels it. */
Close = 1,
}
+
/**
+
* Talkback callback that sends instructions to a source.
+
*
+
* @remarks
+
* This function sends a {@link TalkbackKind} signal to the source to instruct it to send a new value
+
* (pulling) or to be cancelled and stop sending values altogether.
+
*/
export type TalkbackFn = (signal: TalkbackKind) => void;
+
+
/**
+
* Callback that is called when a source is cancelled.
+
*
+
* @remarks
+
* This is used, in particular, in the {@link make | make Source} and is a returned function that is
+
* called when the {@link TalkbackKind.Close} signal is received by the source.
+
*/
export type TeardownFn = () => void;
+
/**
+
* Tag enum that is used to on signals that are sent from a source to a sink.
+
*
+
* @remarks
+
* This signal is issued by a {@link Source} and {@link Sink | Sinks} are called with it. The signals
+
* carrying values ({@link Start} and {@link Push}) are sent as a unary `[T]` tuple tagged with
+
* {@link Tag}. The {@link End} signal carries no value and is sent as a raw `0` value.
+
* @see {@link Start} for the data structure of the start signal.
+
* @see {@link Push} for the data structure of the push signal, carrying values.
+
*/
export const enum SignalKind {
+
/**
+
* Informs the {@link Sink} that it's being called by a {@link Source}.
+
*
+
* @remarks
+
* This starts the stream of values and carries a {@link TalkbackFn | talkback function} with it
+
* that is used by the {@link Sink} to communicate back to the {@link Source}.
+
* @see {@link Start} for the data structure of the signal.
+
*/
Start = 0,
+
/**
+
* Informs the {@link Sink} of a new values that's incoming from the {@link Source}.
+
*
+
* @remarks
+
* This informs the {@link Sink} of new values that are sent by the {@link Source}.
+
* @see {@link Push} for the data structure of the signal.
+
*/
Push = 1,
+
/**
+
* Informs the {@link Sink} that the {@link Source} has ended and that it won't send more values.
+
*
+
* @remarks
+
* This signal signifies that the stream has stopped and that no more values are expected. Some
+
* sources don't have a set end or limit on how many values will be sent. This signal is not sent
+
* when the {@link Source} is cancelled with a {@link TalkbackKind.Close | Close talkback signal}.
+
*/
End = 0,
}
+
/**
+
* The tag property that's put on unary `[T]` tuple to turn them into signals carrying values.
+
*
+
* @internal
+
*/
export interface Tag<T> {
tag: T;
}
+
/**
+
* Indicates the start of a stream to a {@link Sink}.
+
*
+
* @remarks
+
* This signal is sent from a {@link Source} to a {@link Sink} at the start of a stream to inform it
+
* that values can be pulled and/or will be sent. This signal carries a
+
* {@link TalkbackFn | talkback function} that is used by the {@link Sink} to communicate back to the
+
* {@link Source} as a callback. The talkback accepts {@link TalkbackKind.Pull | Pull} and
+
* {@link TalkbackKind.Close | Close} signals.
+
*/
export type Start<_T> = Tag<SignalKind.Start> & [TalkbackFn];
+
+
/**
+
* Sends a new value to a {@link Sink}.
+
*
+
* @remarks
+
* This signal is sent from a {@link Source} to a {@link Sink} to send a new value to it. This is
+
* essentially the signal that wraps new values coming in, like an event. Values are carried on
+
* unary tuples and can be accessed using `signal[0]`.
+
*/
export type Push<T> = Tag<SignalKind.Push> & [T];
+
/**
+
* Signals are sent from {@link Source | Sources} to {@link Sink | Sinks} to inform them of changes.
+
*
+
* @remarks
+
* A {@link Source}, when consumed, sends a sequence of events to {@link Sink | Sinks}. In order, a
+
* {@link SignalKind.Start | Start} signal will always be sent first, followed optionally by one or
+
* more {@link SignalKind.Push | Push signals}, carrying values and representing the stream. A
+
* {@link Source} will send the {@link SignalKind.End | End signal} when it runs out of values. The
+
* End signal will be omitted if the Source is cancelled by a
+
* {@link TalkbackKind.Close | Close signal}, sent back from the {@link Sink}.
+
* @see {@link SignalKind} for the kinds signals sent by {@link Source | Sources}.
+
* @see {@link Start} for the data structure of the start signal.
+
* @see {@link Push} for the data structure of the push signal.
+
*/
export type Signal<T> = Start<T> | Push<T> | SignalKind.End;
+
/**
+
* Callback function that is called by a {@link Source} with {@link Signal | Signals}.
+
*
+
* @remarks
+
* A Sink is a function that is called repeatedly with signals from a {@link Source}. It represents
+
* the receiver of the stream of signals/events coming from a {@link Source}.
+
* @see {@link Signal} for the data structure of signals.
+
*/
export type Sink<T> = (signal: Signal<T>) => void;
+
+
/** Factory function that calls {@link Sink | Sinks} with {@link Signal | Signals} when invoked.
+
* @remarks
+
* A Source is a factory function that when invoked with a {@link Sink}, calls it with
+
* {@link Signal | Signals} to create a stream of events, informing it of new values and the
+
* potential end of the stream of values. The first signal a Source sends is always a
+
* {@link Start | Start signal} that sends a talkback function to the {@link Sink}, so it may request
+
* new values or cancel the source.
+
*
+
* @see {@link Signal} for the data structure of signals.
+
* @see {@link Sink} for the data structure of sinks.
+
*/
export type Source<T> = (sink: Sink<T>) => void;
+
+
/** Transform function that accepts a {@link Source} and returns a new one.
+
* @remarks
+
* Wonka comes with several helper operators that transform a given {@link Source} into a new one,
+
* potentially changing its outputs, or the outputs' timing. An "operator" in Wonka typically
+
* accepts arguments and then returns this kind of function, so they can be chained and composed.
+
*
+
* @see {@link pipe | `pipe`} for the helper used to compose operators.
+
*/
export type Operator<In, Out> = (a: Source<In>) => Source<Out>;
+
/** Type utility to determine the type of a {@link Source}. */
export type TypeOfSource<T> = T extends Source<infer U> ? U : never;
+
/** Subscription object that can be used to cancel a {@link Source}.
+
* @see {@link subscribe | subscribe sink} for a helper that returns this structure.
+
*/
export interface Subscription {
+
/**
+
* Cancels a {@link Source} to stop the subscription from receiving new values.
+
*
+
* @see {@link TalkbackKind.Close | Close signal} This uses the {@link TalkbackFn | talkback function} to send a {@link TalkbackKind.Close | Close signal}
+
* to the subscribed-to {@link Source} to stop it from sending new values. This cleans up the subscription
+
* and ends it immediately.
+
*/
unsubscribe(): void;
}
+
/** An Observer represents sending signals manually to a {@link Sink}.
+
* @remarks
+
* The Observer is used whenever a utility allows for signals to be sent manually as a {@link Source}
+
* would send them.
+
*
+
* @see {@link make | `make` source} for a helper that uses this structure.
+
*/
export interface Observer<T> {
+
/** Sends a new value to the receiving Sink.
+
* @remarks
+
* This creates a {@link Push | Push signal} that is sent to a {@link Sink}.
+
*/
next(value: T): void;
+
/** Indicates to the receiving Sink that no more values will be sent.
+
* @remarks
+
* This creates an {@link SignalKind.End | End signal} that is sent to a {@link Sink}. The Observer
+
* will accept no more values via {@link Observer.next | `next` calls} once this method has been
+
* invoked.
+
*/
complete(): void;
}
+
/** Subjects combine a {@link Source} with the {@link Observer} that is used to send values on said Source.
+
* @remarks
+
* A Subject is used whenever an event hub-like structure is needed, as it both provides the
+
* {@link Observer}'s methods to send signals, as well as the `source` to receive said signals.
+
*
+
* @see {@link makeSubject | `makeSubject` source} for a helper that creates this structure.
+
*/
export interface Subject<T> extends Observer<T> {
+
/** The {@link Source} that issues the signals as the {@link Observer} methods are called. */
source: Source<T>;
}