import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types'; import { push, start, talkbackPlaceholder } from './helpers'; import { fromArray } from './sources'; const identity = (x: T): T => x; /** Buffers values and emits the array of bufferd values each time a `notifier` Source emits. * * @param notifier - A {@link Source} that releases the current buffer. * @returns An {@link Operator}. * * @remarks * `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source * emits, it will emit an array of all buffered values. * * This can be used to group values over time. A buffer will only be emitted when it contains any * values. * * @example * ```ts * pipe( * interval(50), * buffer(interval(100)), * subscribe(x => { * console.log(text); // logs: [0], [1, 2], [3, 4]... * }) * ); * ``` */ export function buffer(notifier: Source): Operator { return source => sink => { let buffer: T[] = []; let sourceTalkback = talkbackPlaceholder; let notifierTalkback = talkbackPlaceholder; let pulled = false; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; notifierTalkback(TalkbackKind.Close); if (buffer.length) sink(push(buffer)); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { sourceTalkback = signal[0]; notifier(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; sourceTalkback(TalkbackKind.Close); if (buffer.length) sink(push(buffer)); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { notifierTalkback = signal[0]; } else if (buffer.length) { const signal = push(buffer); buffer = []; sink(signal); } }); } else { buffer.push(signal[0]); if (!pulled) { pulled = true; sourceTalkback(TalkbackKind.Pull); notifierTalkback(TalkbackKind.Pull); } else { pulled = false; } } }); sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; sourceTalkback(TalkbackKind.Close); notifierTalkback(TalkbackKind.Close); } else if (!ended && !pulled) { pulled = true; sourceTalkback(TalkbackKind.Pull); notifierTalkback(TalkbackKind.Pull); } }) ); }; } /** Emits in order from the Sources returned by a mapping function per value of the Source. * * @param map - A function returning a {@link Source} per value. * @returns An {@link Operator}. * * @remarks * `concatMap` accepts a mapping function which must return a {@link Source} per value. * The output {@link Source} will emit values from each Source the function returned, in order, * queuing sources that aren't yet active. * * This can be used to issue multiple values per emission of an input {@link Source}, while keeping * the order of their outputs consistent. * * @example * ```ts * pipe( * fromArray([1, 2]), * concatMap(x => fromArray([x, x * 2])), * subscribe(x => { * console.log(text); // logs: 1, 2, 2, 4 * }) * ); * ``` */ export function concatMap(map: (value: In) => Source): Operator { return source => sink => { const inputQueue: In[] = []; let outerTalkback = talkbackPlaceholder; let innerTalkback = talkbackPlaceholder; let outerPulled = false; let innerPulled = false; let innerActive = false; let ended = false; function applyInnerSource(innerSource: Source): void { innerActive = true; innerSource(signal => { if (signal === SignalKind.End) { if (innerActive) { innerActive = false; if (inputQueue.length) { applyInnerSource(map(inputQueue.shift()!)); } else if (ended) { sink(SignalKind.End); } else if (!outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } } } else if (signal.tag === SignalKind.Start) { innerPulled = false; (innerTalkback = signal[0])(TalkbackKind.Pull); } else if (innerActive) { sink(signal); if (innerPulled) { innerPulled = false; } else { innerTalkback(TalkbackKind.Pull); } } }); } source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; if (!innerActive && !inputQueue.length) sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { outerTalkback = signal[0]; } else { outerPulled = false; if (innerActive) { inputQueue.push(signal[0]); } else { applyInnerSource(map(signal[0])); } } }); sink( start(signal => { if (signal === TalkbackKind.Close) { if (!ended) { ended = true; outerTalkback(TalkbackKind.Close); } if (innerActive) { innerActive = false; innerTalkback(TalkbackKind.Close); } } else { if (!ended && !outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } if (innerActive && !innerPulled) { innerPulled = true; innerTalkback(TalkbackKind.Pull); } } }) ); }; } /** Flattens a Source emitting Sources into a single Source emitting the inner values in order. * * @see {@link concatMap} which this helper uses and instead accept a mapping function. * @param source - An {@link Source} emitting {@link Source | Sources}. * @returns A {@link Source} emitting values from the inner Sources. * * @remarks * `concatAll` accepts a {@link Source} emitting {@link Source | Sources}. * The output {@link Source} will emit values from each Source, in order, queuing sources that * aren't yet active. * * @example * ```ts * pipe( * fromArray([ * fromArray([1, 2]), * fromArray([3, 4]), * ]), * concatAll, * subscribe(x => { * console.log(text); // logs: 1, 2, 3, 4 * }) * ); * ``` */ export function concatAll(source: Source>): Source { return concatMap, T>(identity)(source); } /** Emits values from the passed sources in order. * * @param sources - An array of {@link Source | Sources}. * @returns A {@link Source} emitting values from the input Sources. * * @remarks * `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting * with the first one and continuing to the next only when the prior source ended. * * This can be used to issue combine sources while keeping the order of their values intact. * * @example * ```ts * pipe( * concat([ * fromArray([1, 2]), * fromArray([3, 4]), * ]), * subscribe(x => { * console.log(text); // logs: 1, 2, 3, 4 * }) * ); * ``` */ export function concat(sources: Source[]): Source { return concatAll(fromArray(sources)); } /** Filters out emitted values for which the passed predicate function returns `false`. * * @param predicate - A function returning a boolean per value. * @returns An {@link Operator}. * * @remarks * `filter` will omit values from the {@link Source} for which the passed `predicate` function * returns `false`. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * filter(x => x % 2 === 0), * subscribe(x => { * console.log(text); // logs: 2 * }) * ); * ``` */ export function filter(predicate: (value: T) => boolean): Operator { return source => sink => { let talkback = talkbackPlaceholder; source(signal => { if (signal === SignalKind.End) { sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { talkback = signal[0]; sink(signal); } else if (!predicate(signal[0])) { talkback(TalkbackKind.Pull); } else { sink(signal); } }); }; } /** Maps emitted values using the passed mapping function. * * @param map - A function returning transforming the {@link Source | Source's} values. * @returns An {@link Operator}. * * @remarks * `map` accepts a transform function and calls it on each emitted value. It then emits * the values returned by the transform function instead. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * map(x => x * 2), * subscribe(x => { * console.log(text); // logs: 2, 4, 6 * }) * ); * ``` */ export function map(map: (value: In) => Out): Operator { return source => sink => source(signal => { if (signal === SignalKind.End || signal.tag === SignalKind.Start) { sink(signal); } else { sink(push(map(signal[0]))); } }); } /** Emits from the Sources returned by a mapping function per value of the Source. * * @param map - A function returning a {@link Source} per value. * @returns An {@link Operator}. * * @remarks * `mergeMap` accepts a mapping function which must return a {@link Source} per value. * The output {@link Source} will emit values from all {@link Source | Sources} the mapping function * returned. * * This can be used to issue multiple values per emission of an input {@link Source}, essentially * multiplexing all values to multiple Sources. * * @example * ```ts * pipe( * interval(50), * mergeMap(x => pipe( * fromValue(x), * delay(100) * )), * subscribe(x => { * console.log(text); // logs: 0, 1, 2... * }) * ); * ``` */ export function mergeMap(map: (value: In) => Source): Operator { return source => sink => { let innerTalkbacks: TalkbackFn[] = []; let outerTalkback = talkbackPlaceholder; let outerPulled = false; let ended = false; function applyInnerSource(innerSource: Source): void { let talkback = talkbackPlaceholder; innerSource(signal => { if (signal === SignalKind.End) { if (innerTalkbacks.length) { const index = innerTalkbacks.indexOf(talkback); if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1); if (!innerTalkbacks.length) { if (ended) { sink(SignalKind.End); } else if (!outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } } } } else if (signal.tag === SignalKind.Start) { innerTalkbacks.push((talkback = signal[0])); talkback(TalkbackKind.Pull); } else if (innerTalkbacks.length) { sink(signal); talkback(TalkbackKind.Pull); } }); } source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; if (!innerTalkbacks.length) sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { outerTalkback = signal[0]; } else { outerPulled = false; applyInnerSource(map(signal[0])); if (!outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } } }); sink( start(signal => { if (signal === TalkbackKind.Close) { if (!ended) { ended = true; outerTalkback(TalkbackKind.Close); } for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) a[i](TalkbackKind.Close); innerTalkbacks.length = 0; } else { if (!ended && !outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } else { outerPulled = false; } for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++) a[i](TalkbackKind.Pull); } }) ); }; } /** Flattens a Source emitting Sources into a single Source emitting the inner values. * * @see {@link mergeMap} which this helper uses and instead accept a mapping function. * @param source - An {@link Source} emitting {@link Source | Sources}. * @returns A {@link Source} emitting values from the inner Sources. * * @remarks * `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe * to each incoming source immediately and start passing its emitted values through. * * @example * ```ts * pipe( * fromArray([ * interval(50), * interval(100), * ]), * mergeAll, * subscribe(x => { * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2 * }) * ); * ``` */ export function mergeAll(source: Source>): Source { return mergeMap, T>(identity)(source); } /** Emits values from the passed sources simultaneously. * * @param sources - An array of {@link Source | Sources}. * @returns A {@link Source} emitting values from the input Sources. * * @remarks * `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing * through all their emitted values simultaneously. * * This can be used to interleave the values of multiple sources. * * @example * ```ts * pipe( * merge([ * interval(50), * interval(100), * ]), * subscribe(x => { * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2 * }) * ); * ``` */ export function merge(sources: Source[]): Source { return mergeAll(fromArray(sources)); } /** Calls the passed callback function when the Source ends or is closed. * * @param callback - A function that is called when the {@link Source} ends. * @returns An {@link Operator}. * * @remarks * `onEnd` accepts a callback which is called when the {@link Source} either ends * or is closed. * * This operator can be used to add side-effects to a Source. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * take(1), * onEnd(() => { * console.log('end'); * }), * publish * ); * ``` */ export function onEnd(callback: () => void): Operator { return source => sink => { let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; sink(SignalKind.End); callback(); } else if (signal.tag === SignalKind.Start) { const talkback = signal[0]; sink( start(signal => { if (signal === TalkbackKind.Close) { ended = true; talkback(TalkbackKind.Close); callback(); } else { talkback(signal); } }) ); } else { sink(signal); } }); }; } /** Calls the passed callback function when the Source emits a value. * * @param callback - A function that is called with each value the {@link Source} emits. * @returns An {@link Operator}. * * @remarks * `onPush` accepts a callback which is called for every emitted value of * the {@link Source}. * * This operator can be used to add side-effects to a Source. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * onPush(value => { * console.log(value); // logs: 1, 2, 3 * }), * publish * ); * ``` */ export function onPush(callback: (value: T) => void): Operator { return source => sink => { let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { const talkback = signal[0]; sink( start(signal => { if (signal === TalkbackKind.Close) ended = true; talkback(signal); }) ); } else { callback(signal[0]); sink(signal); } }); }; } /** Calls the passed callback function when the Source starts. * * @param callback - A function that is called when the {@link Source} is started. * @returns An {@link Operator}. * * @remarks * `onPush` accepts a callback which is called for every emitted value of * the {@link Source}. * * This operator can be used to add side-effects to a Source. * Specifically, it's useful to add a side-effect for a Source that triggers only once * the {@link Source} is used and started. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * onStart(() => { * console.log('start'); * }), * publish * ); * ``` */ export function onStart(callback: () => void): Operator { return source => sink => source(signal => { if (signal === SignalKind.End) { sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { sink(signal); callback(); } else { sink(signal); } }); } /** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value. * * @param notifier - A {@link Source} that triggers the last value to be emitted. * @returns An {@link Operator}. * * @remarks * `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source * emits, it will emit the latest value. * * This is a back pressure operator that can be used to omit values from a {@link Source} coming in * too frequently. * * {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be * ignored. * * @example * ```ts * pipe( * interval(50), * sample(interval(100)), * subscribe(x => { * console.log(text); // logs: 0, 2, 4... * }) * ); * ``` */ export function sample(notifier: Source): Operator { return source => sink => { let sourceTalkback = talkbackPlaceholder; let notifierTalkback = talkbackPlaceholder; let value: T | void; let pulled = false; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; notifierTalkback(TalkbackKind.Close); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { sourceTalkback = signal[0]; } else { value = signal[0]; if (!pulled) { pulled = true; notifierTalkback(TalkbackKind.Pull); sourceTalkback(TalkbackKind.Pull); } else { pulled = false; } } }); notifier(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; sourceTalkback(TalkbackKind.Close); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { notifierTalkback = signal[0]; } else if (value !== undefined) { const signal = push(value); value = undefined; sink(signal); } }); sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; sourceTalkback(TalkbackKind.Close); notifierTalkback(TalkbackKind.Close); } else if (!ended && !pulled) { pulled = true; sourceTalkback(TalkbackKind.Pull); notifierTalkback(TalkbackKind.Pull); } }) ); }; } /** Maps emitted values using the passed reducer function. * * @param reducer - A function called with the last value by the `reducer` and the emitted value. * @param seed - The initial value that is passed to the `reducer`. * @returns An {@link Operator}. * * @remarks * `scan` accepts a reducer function and a seed value. The reducer will be called initially with the * seed value and the first emitted value. The {@link Source} will then emit the value returned by * the reducer function. Subsequently, the `reducer` is called with the last value the `reducer` * returned and the emitted value. * * This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits * each value of the reducer. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * scan((acc, x) => acc + x, 0), * subscribe(x => { * console.log(text); // logs: 1, 3, 6 * }) * ); * ``` */ export function scan(reducer: (acc: Out, value: In) => Out, seed: Out): Operator { return source => sink => { let acc = seed; source(signal => { if (signal === SignalKind.End) { sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { sink(signal); } else { sink(push((acc = reducer(acc, signal[0])))); } }); }; } /** Shares one underlying subscription to the Source between all Sinks. * * @param source - A {@link Source} that should be shared. * @returns A shared {@link Source}. * * @remarks * `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it * will share one subscription to the input source. This allows side-effects on the input * {@link Source} to only be triggerd once. */ export function share(source: Source): Source { let sinks: Sink[] = []; let talkback = talkbackPlaceholder; let gotSignal = false; return sink => { sinks.push(sink); if (sinks.length === 1) { source(signal => { if (signal === SignalKind.End) { for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End); sinks.length = 0; } else if (signal.tag === SignalKind.Start) { talkback = signal[0]; } else { gotSignal = false; for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal); } }); } sink( start(signal => { if (signal === TalkbackKind.Close) { const index = sinks.indexOf(sink); if (index > -1) (sinks = sinks.slice()).splice(index, 1); if (!sinks.length) talkback(TalkbackKind.Close); } else if (!gotSignal) { gotSignal = true; talkback(TalkbackKind.Pull); } }) ); }; } /** Omits `wait` amount of values from the Source and then runs as usual. * * @param wait - The number of values to be omitted. * @returns An {@link Operator}. * * @remarks * `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards. * This essentially skips a given number of values on the input {@link Source}. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * skip(2), * subscribe(x => { * console.log(text); // logs: 3 * }) * ); * ``` */ export function skip(wait: number): Operator { return source => sink => { let talkback = talkbackPlaceholder; let rest = wait; source(signal => { if (signal === SignalKind.End) { sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { talkback = signal[0]; sink(signal); } else if (rest-- > 0) { talkback(TalkbackKind.Pull); } else { sink(signal); } }); }; } /** Omits values from an input Source until a notifier Source emits a value. * * @param notifier - A {@link Source} that starts the operator's sent values. * @returns An {@link Operator}. * * @remarks * `skipUntil` will omit all values from the input {@link Source} until the `notifier` * Source emits a value of its own. It'll then start passing values from the Source through. * * @example * ```ts * pipe( * interval(50), * skipUntil(interval(150)), * subscribe(x => { * console.log(text); // logs: 2, 3... * }) * ); * ``` */ export function skipUntil(notifier: Source): Operator { return source => sink => { let sourceTalkback = talkbackPlaceholder; let notifierTalkback = talkbackPlaceholder; let skip = true; let pulled = false; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; if (skip) notifierTalkback(TalkbackKind.Close); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { sourceTalkback = signal[0]; notifier(signal => { if (signal === SignalKind.End) { if (skip) { ended = true; sourceTalkback(TalkbackKind.Close); } } else if (signal.tag === SignalKind.Start) { (notifierTalkback = signal[0])(TalkbackKind.Pull); } else { skip = false; notifierTalkback(TalkbackKind.Close); } }); } else if (!skip) { pulled = false; sink(signal); } else if (!pulled) { pulled = true; sourceTalkback(TalkbackKind.Pull); notifierTalkback(TalkbackKind.Pull); } else { pulled = false; } }); sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; sourceTalkback(TalkbackKind.Close); if (skip) notifierTalkback(TalkbackKind.Close); } else if (!ended && !pulled) { pulled = true; if (skip) notifierTalkback(TalkbackKind.Pull); sourceTalkback(TalkbackKind.Pull); } }) ); }; } /** Omits values from an input Source until a predicate function returns `false`. * * @param predicate - A function returning a boolean per value. * @returns An {@link Operator}. * * @remarks * `skipWhile` will omit all values from the input {@link Source} until the `predicate` * function returns `false`. When the `predicate` function returns `false`, the Source's values will * be passed through. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * skipWhile(x => x < 2), * subscribe(x => { * console.log(text); // logs: 2, 3 * }) * ); * ``` */ export function skipWhile(predicate: (value: T) => boolean): Operator { return source => sink => { let talkback = talkbackPlaceholder; let skip = true; source(signal => { if (signal === SignalKind.End) { sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { talkback = signal[0]; sink(signal); } else if (skip) { if (predicate(signal[0])) { talkback(TalkbackKind.Pull); } else { skip = false; sink(signal); } } else { sink(signal); } }); }; } /** Emits from the latest Source returned by a mapping function per value of the Source. * * @param map - A function returning a {@link Source} per value. * @returns An {@link Operator}. * * @remarks * `switchMap` accepts a mapping function which must return a {@link Source} per value. * The output {@link Source} will emit values from the latest Source the mapping function * returned. If a value is emitted while the last returned Source is still active, the prior Source * will be closed. * * This can be used to issue multiple values per emission of an input {@link Source}, while only * letting one of these sub-Sources be active at a time. * * @example * ```ts * pipe( * interval(100), * switchMap(() => interval(50)), * subscribe(x => { * console.log(text); // logs: 0, 0, 0... * }) * ); * ``` */ export function switchMap(map: (value: In) => Source): Operator { return source => sink => { let outerTalkback = talkbackPlaceholder; let innerTalkback = talkbackPlaceholder; let outerPulled = false; let innerPulled = false; let innerActive = false; let ended = false; function applyInnerSource(innerSource: Source): void { innerActive = true; innerSource(signal => { if (!innerActive) { /*noop*/ } else if (signal === SignalKind.End) { innerActive = false; if (ended) { sink(SignalKind.End); } else if (!outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } } else if (signal.tag === SignalKind.Start) { innerPulled = false; (innerTalkback = signal[0])(TalkbackKind.Pull); } else { sink(signal); if (!innerPulled) { innerTalkback(TalkbackKind.Pull); } else { innerPulled = false; } } }); } source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; if (!innerActive) sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { outerTalkback = signal[0]; } else { if (innerActive) { innerTalkback(TalkbackKind.Close); innerTalkback = talkbackPlaceholder; } if (!outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } else { outerPulled = false; } applyInnerSource(map(signal[0])); } }); sink( start(signal => { if (signal === TalkbackKind.Close) { if (!ended) { ended = true; outerTalkback(TalkbackKind.Close); } if (innerActive) { innerActive = false; innerTalkback(TalkbackKind.Close); } } else { if (!ended && !outerPulled) { outerPulled = true; outerTalkback(TalkbackKind.Pull); } if (innerActive && !innerPulled) { innerPulled = true; innerTalkback(TalkbackKind.Pull); } } }) ); }; } /** Flattens a Source emitting Sources into a single Source emitting the inner values. * * @see {@link switchMap} which this helper uses and instead accept a mapping function. * @param source - An {@link Source} emitting {@link Source | Sources}. * @returns A {@link Source} emitting values from the inner Sources. * * @remarks * `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it * receives a {@link Source} it will close its prior subscription and subscribe to the new Source * instead, passing through its values. * * @example * ```ts * pipe( * interval(100), * map(() => interval(50)), * switchAll, * subscribe(x => { * console.log(text); // logs: 0, 0, 0... * }) * ); * ``` */ export function switchAll(source: Source>): Source { return switchMap, T>(identity)(source); } /** Emits `max` values from the Source and then ends. * * @param max - The maximum number of values emitted. * @returns An {@link Operator}. * * @remarks * `take` will issue all values as normal until the `max` number of emitted values has been reached. * It will then end and close the {@link Source}. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * take(2), * subscribe(x => { * console.log(text); // logs: 1, 2 * }) * ); * ``` */ export function take(max: number): Operator { return source => sink => { let talkback = talkbackPlaceholder; let ended = false; let taken = 0; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { if (max <= 0) { ended = true; sink(SignalKind.End); signal[0](TalkbackKind.Close); } else { talkback = signal[0]; } } else if (taken++ < max) { sink(signal); if (!ended && taken >= max) { ended = true; sink(SignalKind.End); talkback(TalkbackKind.Close); } } else { sink(signal); } }); sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; talkback(TalkbackKind.Close); } else if (signal === TalkbackKind.Pull && !ended && taken < max) { talkback(TalkbackKind.Pull); } }) ); }; } /** Buffers the `max` last values of the Source and emits them once the Source ends. * * @param max - The maximum number of values buffered. * @returns An {@link Operator}. * * @remarks * `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It * will only emit values stored in the buffer once the {@link Source} ends. * * All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would * synchronously. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * takeLast(1), * subscribe(x => { * console.log(text); // logs: 3 * }) * ); * ``` */ export function takeLast(max: number): Operator { return source => sink => { const queue: T[] = []; let talkback = talkbackPlaceholder; source(signal => { if (signal === SignalKind.End) { fromArray(queue)(sink); } else if (signal.tag === SignalKind.Start) { if (max <= 0) { signal[0](TalkbackKind.Close); fromArray(queue)(sink); } else { (talkback = signal[0])(TalkbackKind.Pull); } } else { if (queue.length >= max && max) queue.shift(); queue.push(signal[0]); talkback(TalkbackKind.Pull); } }); }; } /** Takes values from an input Source until a notifier Source emits a value. * * @param notifier - A {@link Source} that stops the operator's sent values. * @returns An {@link Operator}. * * @remarks * `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier` * Source emits a value of its own. It'll then close the {@link Source}. * * @example * ```ts * pipe( * interval(50), * takeUntil(interval(150)), * subscribe(x => { * console.log(text); // logs: 0, 1 * }) * ); * ``` */ export function takeUntil(notifier: Source): Operator { return source => sink => { let sourceTalkback = talkbackPlaceholder; let notifierTalkback = talkbackPlaceholder; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; notifierTalkback(TalkbackKind.Close); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { sourceTalkback = signal[0]; notifier(signal => { if (signal === SignalKind.End) { /*noop*/ } else if (signal.tag === SignalKind.Start) { (notifierTalkback = signal[0])(TalkbackKind.Pull); } else { ended = true; notifierTalkback(TalkbackKind.Close); sourceTalkback(TalkbackKind.Close); sink(SignalKind.End); } }); } else { sink(signal); } }); sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; sourceTalkback(TalkbackKind.Close); notifierTalkback(TalkbackKind.Close); } else if (!ended) { sourceTalkback(TalkbackKind.Pull); } }) ); }; } /** Takes values from an input Source until a predicate function returns `false`. * * @param predicate - A function returning a boolean per value. * @returns An {@link Operator}. * * @remarks * `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate` * function returns `false`. When the `predicate` function returns `false`, the current value is * omitted and the {@link Source} is closed. * * @example * ```ts * pipe( * fromArray([1, 2, 3]), * takeWhile(x => x < 2), * subscribe(x => { * console.log(text); // logs: 1 * }) * ); * ``` */ export function takeWhile(predicate: (value: T) => boolean): Operator { return source => sink => { let talkback = talkbackPlaceholder; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { talkback = signal[0]; sink(signal); } else if (!predicate(signal[0])) { ended = true; sink(SignalKind.End); talkback(TalkbackKind.Close); } else { sink(signal); } }); }; } /** Debounces a Source by omitting values until a given timeframe has passed. * * @param timing - A function returning a debounce time (ms) per emitted value. * @returns An {@link Operator}. * * @remarks * `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted * value. All emitted values issued by the {@link Source} during the returned time will be omitted * until the time has passed. * * Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a * value is let through. * * This is a back pressure operator that can be used to omit values from a {@link Source} coming in * too frequently. * * @example * ```ts * pipe( * interval(50), * debounce(() => 100), * subscribe(x => { * console.log(text); // never logs any value * }) * ); * ``` */ export function debounce(timing: (value: T) => number): Operator { return source => sink => { let id: any | void; let deferredEnded = false; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; if (id) { deferredEnded = true; } else { sink(SignalKind.End); } } else if (signal.tag === SignalKind.Start) { const talkback = signal[0]; sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; deferredEnded = false; if (id) clearTimeout(id); talkback(TalkbackKind.Close); } else if (!ended) { talkback(TalkbackKind.Pull); } }) ); } else { if (id) clearTimeout(id); id = setTimeout(() => { id = undefined; sink(signal); if (deferredEnded) sink(SignalKind.End); }, timing(signal[0])); } }); }; } /** Delays each signal emitted by a Source by given time (ms). * * @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed. * @returns An {@link Operator}. * * @remarks * `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by. * This will create a timeout per received signal and delay the emitted values accordingly. * * Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to * be ordered. Otherwise, signals will arrive in the wrong order at the sink. */ export function delay(wait: number): Operator { return source => sink => { let active = 0; source(signal => { if (signal !== SignalKind.End && signal.tag === SignalKind.Start) { sink(signal); } else { active++; setTimeout(() => { if (active) { active--; sink(signal); } }, wait); } }); }; } /** Throttles a Source by omitting values that are emitted before a given timeout. * * @param timing - A function returning a throttle time (ms) per emitted value. * @returns An {@link Operator}. * * @remarks * `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted * value. During the returned timeframe all values issued by the {@link Source} will be omitted and * dropped. * * This is a back pressure operator that can be used to omit values from a {@link Source} coming in * too frequently. * * @example * ```ts * pipe( * interval(50), * throttle(() => 100), * subscribe(x => { * // omits every second value: 0, 2, 4... * console.log(text); * }) * ); * ``` */ export function throttle(timing: (value: T) => number): Operator { return source => sink => { let skip = false; let id: any | void; source(signal => { if (signal === SignalKind.End) { if (id) clearTimeout(id); sink(SignalKind.End); } else if (signal.tag === SignalKind.Start) { const talkback = signal[0]; sink( start(signal => { if (signal === TalkbackKind.Close) { if (id) clearTimeout(id); talkback(TalkbackKind.Close); } else { talkback(TalkbackKind.Pull); } }) ); } else if (!skip) { skip = true; if (id) clearTimeout(id); id = setTimeout(() => { id = undefined; skip = false; }, timing(signal[0])); sink(signal); } }); }; } export { mergeAll as flatten, onPush as tap };