1import { Push, Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3import { fromArray } from './sources';
4
5const identity = <T>(x: T): T => x;
6
7/** Buffers values and emits the array of bufferd values each time a `notifier` Source emits.
8 *
9 * @param notifier - A {@link Source} that releases the current buffer.
10 * @returns An {@link Operator}.
11 *
12 * @remarks
13 * `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source
14 * emits, it will emit an array of all buffered values.
15 *
16 * This can be used to group values over time. A buffer will only be emitted when it contains any
17 * values.
18 *
19 * @example
20 * ```ts
21 * pipe(
22 * interval(50),
23 * buffer(interval(100)),
24 * subscribe(x => {
25 * console.log(text); // logs: [0], [1, 2], [3, 4]...
26 * })
27 * );
28 * ```
29 */
30export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
31 return source => sink => {
32 let buffer: T[] = [];
33 let sourceTalkback = talkbackPlaceholder;
34 let notifierTalkback = talkbackPlaceholder;
35 let pulled = false;
36 let ended = false;
37 source(signal => {
38 if (ended) {
39 /*noop*/
40 } else if (signal === SignalKind.End) {
41 ended = true;
42 notifierTalkback(TalkbackKind.Close);
43 if (buffer.length) sink(push(buffer));
44 sink(SignalKind.End);
45 } else if (signal.tag === SignalKind.Start) {
46 sourceTalkback = signal[0];
47 notifier(signal => {
48 if (ended) {
49 /*noop*/
50 } else if (signal === SignalKind.End) {
51 ended = true;
52 sourceTalkback(TalkbackKind.Close);
53 if (buffer.length) sink(push(buffer));
54 sink(SignalKind.End);
55 } else if (signal.tag === SignalKind.Start) {
56 notifierTalkback = signal[0];
57 } else if (buffer.length) {
58 const signal = push(buffer);
59 buffer = [];
60 sink(signal);
61 }
62 });
63 } else {
64 buffer.push(signal[0]);
65 if (!pulled) {
66 pulled = true;
67 sourceTalkback(TalkbackKind.Pull);
68 notifierTalkback(TalkbackKind.Pull);
69 } else {
70 pulled = false;
71 }
72 }
73 });
74 sink(
75 start(signal => {
76 if (signal === TalkbackKind.Close && !ended) {
77 ended = true;
78 sourceTalkback(TalkbackKind.Close);
79 notifierTalkback(TalkbackKind.Close);
80 } else if (!ended && !pulled) {
81 pulled = true;
82 sourceTalkback(TalkbackKind.Pull);
83 notifierTalkback(TalkbackKind.Pull);
84 }
85 })
86 );
87 };
88}
89
90/** Emits in order from the Sources returned by a mapping function per value of the Source.
91 *
92 * @param map - A function returning a {@link Source} per value.
93 * @returns An {@link Operator}.
94 *
95 * @remarks
96 * `concatMap` accepts a mapping function which must return a {@link Source} per value.
97 * The output {@link Source} will emit values from each Source the function returned, in order,
98 * queuing sources that aren't yet active.
99 *
100 * This can be used to issue multiple values per emission of an input {@link Source}, while keeping
101 * the order of their outputs consistent.
102 *
103 * @example
104 * ```ts
105 * pipe(
106 * fromArray([1, 2]),
107 * concatMap(x => fromArray([x, x * 2])),
108 * subscribe(x => {
109 * console.log(text); // logs: 1, 2, 2, 4
110 * })
111 * );
112 * ```
113 */
114export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
115 return source => sink => {
116 const inputQueue: In[] = [];
117 let outerTalkback = talkbackPlaceholder;
118 let innerTalkback = talkbackPlaceholder;
119 let outerPulled = false;
120 let innerPulled = false;
121 let innerActive = false;
122 let ended = false;
123 function applyInnerSource(innerSource: Source<Out>): void {
124 innerActive = true;
125 innerSource(signal => {
126 if (signal === SignalKind.End) {
127 if (innerActive) {
128 innerActive = false;
129 if (inputQueue.length) {
130 applyInnerSource(map(inputQueue.shift()!));
131 } else if (ended) {
132 sink(SignalKind.End);
133 } else if (!outerPulled) {
134 outerPulled = true;
135 outerTalkback(TalkbackKind.Pull);
136 }
137 }
138 } else if (signal.tag === SignalKind.Start) {
139 innerPulled = false;
140 (innerTalkback = signal[0])(TalkbackKind.Pull);
141 } else if (innerActive) {
142 sink(signal);
143 if (innerPulled) {
144 innerPulled = false;
145 } else {
146 innerTalkback(TalkbackKind.Pull);
147 }
148 }
149 });
150 }
151 source(signal => {
152 if (ended) {
153 /*noop*/
154 } else if (signal === SignalKind.End) {
155 ended = true;
156 if (!innerActive && !inputQueue.length) sink(SignalKind.End);
157 } else if (signal.tag === SignalKind.Start) {
158 outerTalkback = signal[0];
159 } else {
160 outerPulled = false;
161 if (innerActive) {
162 inputQueue.push(signal[0]);
163 } else {
164 applyInnerSource(map(signal[0]));
165 }
166 }
167 });
168 sink(
169 start(signal => {
170 if (signal === TalkbackKind.Close) {
171 if (!ended) {
172 ended = true;
173 outerTalkback(TalkbackKind.Close);
174 }
175 if (innerActive) {
176 innerActive = false;
177 innerTalkback(TalkbackKind.Close);
178 }
179 } else {
180 if (!ended && !outerPulled) {
181 outerPulled = true;
182 outerTalkback(TalkbackKind.Pull);
183 }
184 if (innerActive && !innerPulled) {
185 innerPulled = true;
186 innerTalkback(TalkbackKind.Pull);
187 }
188 }
189 })
190 );
191 };
192}
193
194/** Flattens a Source emitting Sources into a single Source emitting the inner values in order.
195 *
196 * @see {@link concatMap} which this helper uses and instead accept a mapping function.
197 * @param source - An {@link Source} emitting {@link Source | Sources}.
198 * @returns A {@link Source} emitting values from the inner Sources.
199 *
200 * @remarks
201 * `concatAll` accepts a {@link Source} emitting {@link Source | Sources}.
202 * The output {@link Source} will emit values from each Source, in order, queuing sources that
203 * aren't yet active.
204 *
205 * @example
206 * ```ts
207 * pipe(
208 * fromArray([
209 * fromArray([1, 2]),
210 * fromArray([3, 4]),
211 * ]),
212 * concatAll,
213 * subscribe(x => {
214 * console.log(text); // logs: 1, 2, 3, 4
215 * })
216 * );
217 * ```
218 */
219export function concatAll<T>(source: Source<Source<T>>): Source<T> {
220 return concatMap<Source<T>, T>(identity)(source);
221}
222
223/** Emits values from the passed sources in order.
224 *
225 * @param sources - An array of {@link Source | Sources}.
226 * @returns A {@link Source} emitting values from the input Sources.
227 *
228 * @remarks
229 * `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting
230 * with the first one and continuing to the next only when the prior source ended.
231 *
232 * This can be used to issue combine sources while keeping the order of their values intact.
233 *
234 * @example
235 * ```ts
236 * pipe(
237 * concat([
238 * fromArray([1, 2]),
239 * fromArray([3, 4]),
240 * ]),
241 * subscribe(x => {
242 * console.log(text); // logs: 1, 2, 3, 4
243 * })
244 * );
245 * ```
246 */
247export function concat<T>(sources: Source<T>[]): Source<T> {
248 return concatAll(fromArray(sources));
249}
250
251/** Filters out emitted values for which the passed predicate function returns `false`.
252 *
253 * @param predicate - A function returning a boolean per value.
254 * @returns An {@link Operator}.
255 *
256 * @remarks
257 * `filter` will omit values from the {@link Source} for which the passed `predicate` function
258 * returns `false`.
259 *
260 * @example
261 * ```ts
262 * pipe(
263 * fromArray([1, 2, 3]),
264 * filter(x => x % 2 === 0),
265 * subscribe(x => {
266 * console.log(text); // logs: 2
267 * })
268 * );
269 * ```
270 */
271function filter<In, Out extends In>(predicate: (value: In) => value is Out): Operator<In, Out>;
272function filter<T>(predicate: (value: T) => boolean): Operator<T, T>;
273function filter<In, Out>(predicate: (value: In) => boolean): Operator<In, Out> {
274 return source => sink => {
275 let talkback = talkbackPlaceholder;
276 source(signal => {
277 if (signal === SignalKind.End) {
278 sink(SignalKind.End);
279 } else if (signal.tag === SignalKind.Start) {
280 talkback = signal[0];
281 sink(signal);
282 } else if (!predicate(signal[0])) {
283 talkback(TalkbackKind.Pull);
284 } else {
285 sink(signal as Push<any>);
286 }
287 });
288 };
289}
290
291export { filter };
292
293/** Maps emitted values using the passed mapping function.
294 *
295 * @param map - A function returning transforming the {@link Source | Source's} values.
296 * @returns An {@link Operator}.
297 *
298 * @remarks
299 * `map` accepts a transform function and calls it on each emitted value. It then emits
300 * the values returned by the transform function instead.
301 *
302 * @example
303 * ```ts
304 * pipe(
305 * fromArray([1, 2, 3]),
306 * map(x => x * 2),
307 * subscribe(x => {
308 * console.log(text); // logs: 2, 4, 6
309 * })
310 * );
311 * ```
312 */
313export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
314 return source => sink =>
315 source(signal => {
316 if (signal === SignalKind.End || signal.tag === SignalKind.Start) {
317 sink(signal);
318 } else {
319 sink(push(map(signal[0])));
320 }
321 });
322}
323
324/** Emits from the Sources returned by a mapping function per value of the Source.
325 *
326 * @param map - A function returning a {@link Source} per value.
327 * @returns An {@link Operator}.
328 *
329 * @remarks
330 * `mergeMap` accepts a mapping function which must return a {@link Source} per value.
331 * The output {@link Source} will emit values from all {@link Source | Sources} the mapping function
332 * returned.
333 *
334 * This can be used to issue multiple values per emission of an input {@link Source}, essentially
335 * multiplexing all values to multiple Sources.
336 *
337 * @example
338 * ```ts
339 * pipe(
340 * interval(50),
341 * mergeMap(x => pipe(
342 * fromValue(x),
343 * delay(100)
344 * )),
345 * subscribe(x => {
346 * console.log(text); // logs: 0, 1, 2...
347 * })
348 * );
349 * ```
350 */
351export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
352 return source => sink => {
353 let innerTalkbacks: TalkbackFn[] = [];
354 let outerTalkback = talkbackPlaceholder;
355 let outerPulled = false;
356 let ended = false;
357 function applyInnerSource(innerSource: Source<Out>): void {
358 let talkback = talkbackPlaceholder;
359 innerSource(signal => {
360 if (signal === SignalKind.End) {
361 if (innerTalkbacks.length) {
362 const index = innerTalkbacks.indexOf(talkback);
363 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1);
364 if (!innerTalkbacks.length) {
365 if (ended) {
366 sink(SignalKind.End);
367 } else if (!outerPulled) {
368 outerPulled = true;
369 outerTalkback(TalkbackKind.Pull);
370 }
371 }
372 }
373 } else if (signal.tag === SignalKind.Start) {
374 innerTalkbacks.push((talkback = signal[0]));
375 talkback(TalkbackKind.Pull);
376 } else if (innerTalkbacks.length) {
377 sink(signal);
378 talkback(TalkbackKind.Pull);
379 }
380 });
381 }
382 source(signal => {
383 if (ended) {
384 /*noop*/
385 } else if (signal === SignalKind.End) {
386 ended = true;
387 if (!innerTalkbacks.length) sink(SignalKind.End);
388 } else if (signal.tag === SignalKind.Start) {
389 outerTalkback = signal[0];
390 } else {
391 outerPulled = false;
392 applyInnerSource(map(signal[0]));
393 if (!outerPulled) {
394 outerPulled = true;
395 outerTalkback(TalkbackKind.Pull);
396 }
397 }
398 });
399 sink(
400 start(signal => {
401 if (signal === TalkbackKind.Close) {
402 if (!ended) {
403 ended = true;
404 outerTalkback(TalkbackKind.Close);
405 }
406 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
407 a[i](TalkbackKind.Close);
408 innerTalkbacks.length = 0;
409 } else {
410 if (!ended && !outerPulled) {
411 outerPulled = true;
412 outerTalkback(TalkbackKind.Pull);
413 } else {
414 outerPulled = false;
415 }
416 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
417 a[i](TalkbackKind.Pull);
418 }
419 })
420 );
421 };
422}
423
424/** Flattens a Source emitting Sources into a single Source emitting the inner values.
425 *
426 * @see {@link mergeMap} which this helper uses and instead accept a mapping function.
427 * @param source - An {@link Source} emitting {@link Source | Sources}.
428 * @returns A {@link Source} emitting values from the inner Sources.
429 *
430 * @remarks
431 * `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe
432 * to each incoming source immediately and start passing its emitted values through.
433 *
434 * @example
435 * ```ts
436 * pipe(
437 * fromArray([
438 * interval(50),
439 * interval(100),
440 * ]),
441 * mergeAll,
442 * subscribe(x => {
443 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
444 * })
445 * );
446 * ```
447 */
448export function mergeAll<T>(source: Source<Source<T>>): Source<T> {
449 return mergeMap<Source<T>, T>(identity)(source);
450}
451
452/** Emits values from the passed sources simultaneously.
453 *
454 * @param sources - An array of {@link Source | Sources}.
455 * @returns A {@link Source} emitting values from the input Sources.
456 *
457 * @remarks
458 * `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing
459 * through all their emitted values simultaneously.
460 *
461 * This can be used to interleave the values of multiple sources.
462 *
463 * @example
464 * ```ts
465 * pipe(
466 * merge([
467 * interval(50),
468 * interval(100),
469 * ]),
470 * subscribe(x => {
471 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
472 * })
473 * );
474 * ```
475 */
476export function merge<T>(sources: Source<T>[]): Source<T> {
477 return mergeAll(fromArray(sources));
478}
479
480/** Calls the passed callback function when the Source ends or is closed.
481 *
482 * @param callback - A function that is called when the {@link Source} ends.
483 * @returns An {@link Operator}.
484 *
485 * @remarks
486 * `onEnd` accepts a callback which is called when the {@link Source} either ends
487 * or is closed.
488 *
489 * This operator can be used to add side-effects to a Source.
490 *
491 * @example
492 * ```ts
493 * pipe(
494 * fromArray([1, 2, 3]),
495 * take(1),
496 * onEnd(() => {
497 * console.log('end');
498 * }),
499 * publish
500 * );
501 * ```
502 */
503export function onEnd<T>(callback: () => void): Operator<T, T> {
504 return source => sink => {
505 let ended = false;
506 source(signal => {
507 if (ended) {
508 /*noop*/
509 } else if (signal === SignalKind.End) {
510 ended = true;
511 sink(SignalKind.End);
512 callback();
513 } else if (signal.tag === SignalKind.Start) {
514 const talkback = signal[0];
515 sink(
516 start(signal => {
517 if (signal === TalkbackKind.Close) {
518 ended = true;
519 talkback(TalkbackKind.Close);
520 callback();
521 } else {
522 talkback(signal);
523 }
524 })
525 );
526 } else {
527 sink(signal);
528 }
529 });
530 };
531}
532
533/** Calls the passed callback function when the Source emits a value.
534 *
535 * @param callback - A function that is called with each value the {@link Source} emits.
536 * @returns An {@link Operator}.
537 *
538 * @remarks
539 * `onPush` accepts a callback which is called for every emitted value of
540 * the {@link Source}.
541 *
542 * This operator can be used to add side-effects to a Source.
543 *
544 * @example
545 * ```ts
546 * pipe(
547 * fromArray([1, 2, 3]),
548 * onPush(value => {
549 * console.log(value); // logs: 1, 2, 3
550 * }),
551 * publish
552 * );
553 * ```
554 */
555export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
556 return source => sink => {
557 let ended = false;
558 source(signal => {
559 if (ended) {
560 /*noop*/
561 } else if (signal === SignalKind.End) {
562 ended = true;
563 sink(SignalKind.End);
564 } else if (signal.tag === SignalKind.Start) {
565 const talkback = signal[0];
566 sink(
567 start(signal => {
568 if (signal === TalkbackKind.Close) ended = true;
569 talkback(signal);
570 })
571 );
572 } else {
573 callback(signal[0]);
574 sink(signal);
575 }
576 });
577 };
578}
579
580/** Calls the passed callback function when the Source starts.
581 *
582 * @param callback - A function that is called when the {@link Source} is started.
583 * @returns An {@link Operator}.
584 *
585 * @remarks
586 * `onPush` accepts a callback which is called for every emitted value of
587 * the {@link Source}.
588 *
589 * This operator can be used to add side-effects to a Source.
590 * Specifically, it's useful to add a side-effect for a Source that triggers only once
591 * the {@link Source} is used and started.
592 *
593 * @example
594 * ```ts
595 * pipe(
596 * fromArray([1, 2, 3]),
597 * onStart(() => {
598 * console.log('start');
599 * }),
600 * publish
601 * );
602 * ```
603 */
604export function onStart<T>(callback: () => void): Operator<T, T> {
605 return source => sink =>
606 source(signal => {
607 if (signal === SignalKind.End) {
608 sink(SignalKind.End);
609 } else if (signal.tag === SignalKind.Start) {
610 sink(signal);
611 callback();
612 } else {
613 sink(signal);
614 }
615 });
616}
617
618/** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value.
619 *
620 * @param notifier - A {@link Source} that triggers the last value to be emitted.
621 * @returns An {@link Operator}.
622 *
623 * @remarks
624 * `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source
625 * emits, it will emit the latest value.
626 *
627 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in
628 * too frequently.
629 *
630 * {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be
631 * ignored.
632 *
633 * @example
634 * ```ts
635 * pipe(
636 * interval(50),
637 * sample(interval(100)),
638 * subscribe(x => {
639 * console.log(text); // logs: 0, 2, 4...
640 * })
641 * );
642 * ```
643 */
644export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
645 return source => sink => {
646 let sourceTalkback = talkbackPlaceholder;
647 let notifierTalkback = talkbackPlaceholder;
648 let value: T | void;
649 let pulled = false;
650 let ended = false;
651 source(signal => {
652 if (ended) {
653 /*noop*/
654 } else if (signal === SignalKind.End) {
655 ended = true;
656 notifierTalkback(TalkbackKind.Close);
657 sink(SignalKind.End);
658 } else if (signal.tag === SignalKind.Start) {
659 sourceTalkback = signal[0];
660 } else {
661 value = signal[0];
662 if (!pulled) {
663 pulled = true;
664 notifierTalkback(TalkbackKind.Pull);
665 sourceTalkback(TalkbackKind.Pull);
666 } else {
667 pulled = false;
668 }
669 }
670 });
671 notifier(signal => {
672 if (ended) {
673 /*noop*/
674 } else if (signal === SignalKind.End) {
675 ended = true;
676 sourceTalkback(TalkbackKind.Close);
677 sink(SignalKind.End);
678 } else if (signal.tag === SignalKind.Start) {
679 notifierTalkback = signal[0];
680 } else if (value !== undefined) {
681 const signal = push(value);
682 value = undefined;
683 sink(signal);
684 }
685 });
686 sink(
687 start(signal => {
688 if (signal === TalkbackKind.Close && !ended) {
689 ended = true;
690 sourceTalkback(TalkbackKind.Close);
691 notifierTalkback(TalkbackKind.Close);
692 } else if (!ended && !pulled) {
693 pulled = true;
694 sourceTalkback(TalkbackKind.Pull);
695 notifierTalkback(TalkbackKind.Pull);
696 }
697 })
698 );
699 };
700}
701
702/** Maps emitted values using the passed reducer function.
703 *
704 * @param reducer - A function called with the last value by the `reducer` and the emitted value.
705 * @param seed - The initial value that is passed to the `reducer`.
706 * @returns An {@link Operator}.
707 *
708 * @remarks
709 * `scan` accepts a reducer function and a seed value. The reducer will be called initially with the
710 * seed value and the first emitted value. The {@link Source} will then emit the value returned by
711 * the reducer function. Subsequently, the `reducer` is called with the last value the `reducer`
712 * returned and the emitted value.
713 *
714 * This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits
715 * each value of the reducer.
716 *
717 * @example
718 * ```ts
719 * pipe(
720 * fromArray([1, 2, 3]),
721 * scan((acc, x) => acc + x, 0),
722 * subscribe(x => {
723 * console.log(text); // logs: 1, 3, 6
724 * })
725 * );
726 * ```
727 */
728export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
729 return source => sink => {
730 let acc = seed;
731 source(signal => {
732 if (signal === SignalKind.End) {
733 sink(SignalKind.End);
734 } else if (signal.tag === SignalKind.Start) {
735 sink(signal);
736 } else {
737 sink(push((acc = reducer(acc, signal[0]))));
738 }
739 });
740 };
741}
742
743/** Shares one underlying subscription to the Source between all Sinks.
744 *
745 * @param source - A {@link Source} that should be shared.
746 * @returns A shared {@link Source}.
747 *
748 * @remarks
749 * `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it
750 * will share one subscription to the input source. This allows side-effects on the input
751 * {@link Source} to only be triggerd once.
752 */
753export function share<T>(source: Source<T>): Source<T> {
754 let sinks: Sink<T>[] = [];
755 let talkback = talkbackPlaceholder;
756 let gotSignal = false;
757 return sink => {
758 sinks.push(sink);
759 if (sinks.length === 1) {
760 source(signal => {
761 if (signal === SignalKind.End) {
762 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
763 sinks.length = 0;
764 } else if (signal.tag === SignalKind.Start) {
765 talkback = signal[0];
766 } else {
767 gotSignal = false;
768 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
769 }
770 });
771 }
772 sink(
773 start(signal => {
774 if (signal === TalkbackKind.Close) {
775 const index = sinks.indexOf(sink);
776 if (index > -1) (sinks = sinks.slice()).splice(index, 1);
777 if (!sinks.length) talkback(TalkbackKind.Close);
778 } else if (!gotSignal) {
779 gotSignal = true;
780 talkback(TalkbackKind.Pull);
781 }
782 })
783 );
784 };
785}
786
787/** Omits `wait` amount of values from the Source and then runs as usual.
788 *
789 * @param wait - The number of values to be omitted.
790 * @returns An {@link Operator}.
791 *
792 * @remarks
793 * `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards.
794 * This essentially skips a given number of values on the input {@link Source}.
795 *
796 * @example
797 * ```ts
798 * pipe(
799 * fromArray([1, 2, 3]),
800 * skip(2),
801 * subscribe(x => {
802 * console.log(text); // logs: 3
803 * })
804 * );
805 * ```
806 */
807export function skip<T>(wait: number): Operator<T, T> {
808 return source => sink => {
809 let talkback = talkbackPlaceholder;
810 let rest = wait;
811 source(signal => {
812 if (signal === SignalKind.End) {
813 sink(SignalKind.End);
814 } else if (signal.tag === SignalKind.Start) {
815 talkback = signal[0];
816 sink(signal);
817 } else if (rest-- > 0) {
818 talkback(TalkbackKind.Pull);
819 } else {
820 sink(signal);
821 }
822 });
823 };
824}
825
826/** Omits values from an input Source until a notifier Source emits a value.
827 *
828 * @param notifier - A {@link Source} that starts the operator's sent values.
829 * @returns An {@link Operator}.
830 *
831 * @remarks
832 * `skipUntil` will omit all values from the input {@link Source} until the `notifier`
833 * Source emits a value of its own. It'll then start passing values from the Source through.
834 *
835 * @example
836 * ```ts
837 * pipe(
838 * interval(50),
839 * skipUntil(interval(150)),
840 * subscribe(x => {
841 * console.log(text); // logs: 2, 3...
842 * })
843 * );
844 * ```
845 */
846export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
847 return source => sink => {
848 let sourceTalkback = talkbackPlaceholder;
849 let notifierTalkback = talkbackPlaceholder;
850 let skip = true;
851 let pulled = false;
852 let ended = false;
853 source(signal => {
854 if (ended) {
855 /*noop*/
856 } else if (signal === SignalKind.End) {
857 ended = true;
858 if (skip) notifierTalkback(TalkbackKind.Close);
859 sink(SignalKind.End);
860 } else if (signal.tag === SignalKind.Start) {
861 sourceTalkback = signal[0];
862 notifier(signal => {
863 if (signal === SignalKind.End) {
864 if (skip) {
865 ended = true;
866 sourceTalkback(TalkbackKind.Close);
867 }
868 } else if (signal.tag === SignalKind.Start) {
869 (notifierTalkback = signal[0])(TalkbackKind.Pull);
870 } else {
871 skip = false;
872 notifierTalkback(TalkbackKind.Close);
873 }
874 });
875 } else if (!skip) {
876 pulled = false;
877 sink(signal);
878 } else if (!pulled) {
879 pulled = true;
880 sourceTalkback(TalkbackKind.Pull);
881 notifierTalkback(TalkbackKind.Pull);
882 } else {
883 pulled = false;
884 }
885 });
886 sink(
887 start(signal => {
888 if (signal === TalkbackKind.Close && !ended) {
889 ended = true;
890 sourceTalkback(TalkbackKind.Close);
891 if (skip) notifierTalkback(TalkbackKind.Close);
892 } else if (!ended && !pulled) {
893 pulled = true;
894 if (skip) notifierTalkback(TalkbackKind.Pull);
895 sourceTalkback(TalkbackKind.Pull);
896 }
897 })
898 );
899 };
900}
901
902/** Omits values from an input Source until a predicate function returns `false`.
903 *
904 * @param predicate - A function returning a boolean per value.
905 * @returns An {@link Operator}.
906 *
907 * @remarks
908 * `skipWhile` will omit all values from the input {@link Source} until the `predicate`
909 * function returns `false`. When the `predicate` function returns `false`, the Source's values will
910 * be passed through.
911 *
912 * @example
913 * ```ts
914 * pipe(
915 * fromArray([1, 2, 3]),
916 * skipWhile(x => x < 2),
917 * subscribe(x => {
918 * console.log(text); // logs: 2, 3
919 * })
920 * );
921 * ```
922 */
923export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
924 return source => sink => {
925 let talkback = talkbackPlaceholder;
926 let skip = true;
927 source(signal => {
928 if (signal === SignalKind.End) {
929 sink(SignalKind.End);
930 } else if (signal.tag === SignalKind.Start) {
931 talkback = signal[0];
932 sink(signal);
933 } else if (skip) {
934 if (predicate(signal[0])) {
935 talkback(TalkbackKind.Pull);
936 } else {
937 skip = false;
938 sink(signal);
939 }
940 } else {
941 sink(signal);
942 }
943 });
944 };
945}
946
947/** Emits from the latest Source returned by a mapping function per value of the Source.
948 *
949 * @param map - A function returning a {@link Source} per value.
950 * @returns An {@link Operator}.
951 *
952 * @remarks
953 * `switchMap` accepts a mapping function which must return a {@link Source} per value.
954 * The output {@link Source} will emit values from the latest Source the mapping function
955 * returned. If a value is emitted while the last returned Source is still active, the prior Source
956 * will be closed.
957 *
958 * This can be used to issue multiple values per emission of an input {@link Source}, while only
959 * letting one of these sub-Sources be active at a time.
960 *
961 * @example
962 * ```ts
963 * pipe(
964 * interval(100),
965 * switchMap(() => interval(50)),
966 * subscribe(x => {
967 * console.log(text); // logs: 0, 0, 0...
968 * })
969 * );
970 * ```
971 */
972export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
973 return source => sink => {
974 let outerTalkback = talkbackPlaceholder;
975 let innerTalkback = talkbackPlaceholder;
976 let outerPulled = false;
977 let innerPulled = false;
978 let innerActive = false;
979 let ended = false;
980 function applyInnerSource(innerSource: Source<Out>): void {
981 innerActive = true;
982 innerSource(signal => {
983 if (!innerActive) {
984 /*noop*/
985 } else if (signal === SignalKind.End) {
986 innerActive = false;
987 if (ended) {
988 sink(SignalKind.End);
989 } else if (!outerPulled) {
990 outerPulled = true;
991 outerTalkback(TalkbackKind.Pull);
992 }
993 } else if (signal.tag === SignalKind.Start) {
994 innerPulled = false;
995 (innerTalkback = signal[0])(TalkbackKind.Pull);
996 } else {
997 sink(signal);
998 if (!innerPulled) {
999 innerTalkback(TalkbackKind.Pull);
1000 } else {
1001 innerPulled = false;
1002 }
1003 }
1004 });
1005 }
1006 source(signal => {
1007 if (ended) {
1008 /*noop*/
1009 } else if (signal === SignalKind.End) {
1010 ended = true;
1011 if (!innerActive) sink(SignalKind.End);
1012 } else if (signal.tag === SignalKind.Start) {
1013 outerTalkback = signal[0];
1014 } else {
1015 if (innerActive) {
1016 innerTalkback(TalkbackKind.Close);
1017 innerTalkback = talkbackPlaceholder;
1018 }
1019 if (!outerPulled) {
1020 outerPulled = true;
1021 outerTalkback(TalkbackKind.Pull);
1022 } else {
1023 outerPulled = false;
1024 }
1025 applyInnerSource(map(signal[0]));
1026 }
1027 });
1028 sink(
1029 start(signal => {
1030 if (signal === TalkbackKind.Close) {
1031 if (!ended) {
1032 ended = true;
1033 outerTalkback(TalkbackKind.Close);
1034 }
1035 if (innerActive) {
1036 innerActive = false;
1037 innerTalkback(TalkbackKind.Close);
1038 }
1039 } else {
1040 if (!ended && !outerPulled) {
1041 outerPulled = true;
1042 outerTalkback(TalkbackKind.Pull);
1043 }
1044 if (innerActive && !innerPulled) {
1045 innerPulled = true;
1046 innerTalkback(TalkbackKind.Pull);
1047 }
1048 }
1049 })
1050 );
1051 };
1052}
1053
1054/** Flattens a Source emitting Sources into a single Source emitting the inner values.
1055 *
1056 * @see {@link switchMap} which this helper uses and instead accept a mapping function.
1057 * @param source - An {@link Source} emitting {@link Source | Sources}.
1058 * @returns A {@link Source} emitting values from the inner Sources.
1059 *
1060 * @remarks
1061 * `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it
1062 * receives a {@link Source} it will close its prior subscription and subscribe to the new Source
1063 * instead, passing through its values.
1064 *
1065 * @example
1066 * ```ts
1067 * pipe(
1068 * interval(100),
1069 * map(() => interval(50)),
1070 * switchAll,
1071 * subscribe(x => {
1072 * console.log(text); // logs: 0, 0, 0...
1073 * })
1074 * );
1075 * ```
1076 */
1077export function switchAll<T>(source: Source<Source<T>>): Source<T> {
1078 return switchMap<Source<T>, T>(identity)(source);
1079}
1080
1081/** Emits `max` values from the Source and then ends.
1082 *
1083 * @param max - The maximum number of values emitted.
1084 * @returns An {@link Operator}.
1085 *
1086 * @remarks
1087 * `take` will issue all values as normal until the `max` number of emitted values has been reached.
1088 * It will then end and close the {@link Source}.
1089 *
1090 * @example
1091 * ```ts
1092 * pipe(
1093 * fromArray([1, 2, 3]),
1094 * take(2),
1095 * subscribe(x => {
1096 * console.log(text); // logs: 1, 2
1097 * })
1098 * );
1099 * ```
1100 */
1101export function take<T>(max: number): Operator<T, T> {
1102 return source => sink => {
1103 let talkback = talkbackPlaceholder;
1104 let ended = false;
1105 let taken = 0;
1106 source(signal => {
1107 if (ended) {
1108 /*noop*/
1109 } else if (signal === SignalKind.End) {
1110 ended = true;
1111 sink(SignalKind.End);
1112 } else if (signal.tag === SignalKind.Start) {
1113 if (max <= 0) {
1114 ended = true;
1115 sink(SignalKind.End);
1116 signal[0](TalkbackKind.Close);
1117 } else {
1118 talkback = signal[0];
1119 }
1120 } else if (taken++ < max) {
1121 sink(signal);
1122 if (!ended && taken >= max) {
1123 ended = true;
1124 sink(SignalKind.End);
1125 talkback(TalkbackKind.Close);
1126 }
1127 } else {
1128 sink(signal);
1129 }
1130 });
1131 sink(
1132 start(signal => {
1133 if (signal === TalkbackKind.Close && !ended) {
1134 ended = true;
1135 talkback(TalkbackKind.Close);
1136 } else if (signal === TalkbackKind.Pull && !ended && taken < max) {
1137 talkback(TalkbackKind.Pull);
1138 }
1139 })
1140 );
1141 };
1142}
1143
1144/** Buffers the `max` last values of the Source and emits them once the Source ends.
1145 *
1146 * @param max - The maximum number of values buffered.
1147 * @returns An {@link Operator}.
1148 *
1149 * @remarks
1150 * `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It
1151 * will only emit values stored in the buffer once the {@link Source} ends.
1152 *
1153 * All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would
1154 * synchronously.
1155 *
1156 * @example
1157 * ```ts
1158 * pipe(
1159 * fromArray([1, 2, 3]),
1160 * takeLast(1),
1161 * subscribe(x => {
1162 * console.log(text); // logs: 3
1163 * })
1164 * );
1165 * ```
1166 */
1167export function takeLast<T>(max: number): Operator<T, T> {
1168 return source => sink => {
1169 const queue: T[] = [];
1170 let talkback = talkbackPlaceholder;
1171 source(signal => {
1172 if (signal === SignalKind.End) {
1173 fromArray(queue)(sink);
1174 } else if (signal.tag === SignalKind.Start) {
1175 if (max <= 0) {
1176 signal[0](TalkbackKind.Close);
1177 fromArray(queue)(sink);
1178 } else {
1179 (talkback = signal[0])(TalkbackKind.Pull);
1180 }
1181 } else {
1182 if (queue.length >= max && max) queue.shift();
1183 queue.push(signal[0]);
1184 talkback(TalkbackKind.Pull);
1185 }
1186 });
1187 };
1188}
1189
1190/** Takes values from an input Source until a notifier Source emits a value.
1191 *
1192 * @param notifier - A {@link Source} that stops the operator's sent values.
1193 * @returns An {@link Operator}.
1194 *
1195 * @remarks
1196 * `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier`
1197 * Source emits a value of its own. It'll then close the {@link Source}.
1198 *
1199 * @example
1200 * ```ts
1201 * pipe(
1202 * interval(50),
1203 * takeUntil(interval(150)),
1204 * subscribe(x => {
1205 * console.log(text); // logs: 0, 1
1206 * })
1207 * );
1208 * ```
1209 */
1210export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
1211 return source => sink => {
1212 let sourceTalkback = talkbackPlaceholder;
1213 let notifierTalkback = talkbackPlaceholder;
1214 let ended = false;
1215 source(signal => {
1216 if (ended) {
1217 /*noop*/
1218 } else if (signal === SignalKind.End) {
1219 ended = true;
1220 notifierTalkback(TalkbackKind.Close);
1221 sink(SignalKind.End);
1222 } else if (signal.tag === SignalKind.Start) {
1223 sourceTalkback = signal[0];
1224 notifier(signal => {
1225 if (signal === SignalKind.End) {
1226 /*noop*/
1227 } else if (signal.tag === SignalKind.Start) {
1228 (notifierTalkback = signal[0])(TalkbackKind.Pull);
1229 } else {
1230 ended = true;
1231 notifierTalkback(TalkbackKind.Close);
1232 sourceTalkback(TalkbackKind.Close);
1233 sink(SignalKind.End);
1234 }
1235 });
1236 } else {
1237 sink(signal);
1238 }
1239 });
1240 sink(
1241 start(signal => {
1242 if (signal === TalkbackKind.Close && !ended) {
1243 ended = true;
1244 sourceTalkback(TalkbackKind.Close);
1245 notifierTalkback(TalkbackKind.Close);
1246 } else if (!ended) {
1247 sourceTalkback(TalkbackKind.Pull);
1248 }
1249 })
1250 );
1251 };
1252}
1253
1254/** Takes values from an input Source until a predicate function returns `false`.
1255 *
1256 * @param predicate - A function returning a boolean per value.
1257 * @param addOne - Lets an additional input value pass on.
1258 * @returns An {@link Operator}.
1259 *
1260 * @remarks
1261 * `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate`
1262 * function returns `false`. When the `predicate` function returns `false`, the current value is
1263 * omitted and the {@link Source} is closed.
1264 *
1265 * If `addOne` is set to `true`, the value for which the `predicate` first returned `false` is
1266 * issued and passed on as well instead of being omitted.
1267 *
1268 * @example
1269 * ```ts
1270 * pipe(
1271 * fromArray([1, 2, 3]),
1272 * takeWhile(x => x < 2),
1273 * subscribe(x => {
1274 * console.log(text); // logs: 1
1275 * })
1276 * );
1277 * ```
1278 */
1279export function takeWhile<T>(predicate: (value: T) => boolean, addOne?: boolean): Operator<T, T> {
1280 return source => sink => {
1281 let talkback = talkbackPlaceholder;
1282 let ended = false;
1283 source(signal => {
1284 if (ended) {
1285 /*noop*/
1286 } else if (signal === SignalKind.End) {
1287 ended = true;
1288 sink(SignalKind.End);
1289 } else if (signal.tag === SignalKind.Start) {
1290 talkback = signal[0];
1291 sink(signal);
1292 } else if (!predicate(signal[0])) {
1293 ended = true;
1294 if (addOne) sink(signal);
1295 sink(SignalKind.End);
1296 talkback(TalkbackKind.Close);
1297 } else {
1298 sink(signal);
1299 }
1300 });
1301 };
1302}
1303
1304/** Debounces a Source by omitting values until a given timeframe has passed.
1305 *
1306 * @param timing - A function returning a debounce time (ms) per emitted value.
1307 * @returns An {@link Operator}.
1308 *
1309 * @remarks
1310 * `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted
1311 * value. All emitted values issued by the {@link Source} during the returned time will be omitted
1312 * until the time has passed.
1313 *
1314 * Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a
1315 * value is let through.
1316 *
1317 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in
1318 * too frequently.
1319 *
1320 * @example
1321 * ```ts
1322 * pipe(
1323 * interval(50),
1324 * debounce(() => 100),
1325 * subscribe(x => {
1326 * console.log(text); // never logs any value
1327 * })
1328 * );
1329 * ```
1330 */
1331export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
1332 return source => sink => {
1333 let id: any | void;
1334 let deferredEnded = false;
1335 let ended = false;
1336 source(signal => {
1337 if (ended) {
1338 /*noop*/
1339 } else if (signal === SignalKind.End) {
1340 ended = true;
1341 if (id) {
1342 deferredEnded = true;
1343 } else {
1344 sink(SignalKind.End);
1345 }
1346 } else if (signal.tag === SignalKind.Start) {
1347 const talkback = signal[0];
1348 sink(
1349 start(signal => {
1350 if (signal === TalkbackKind.Close && !ended) {
1351 ended = true;
1352 deferredEnded = false;
1353 if (id) clearTimeout(id);
1354 talkback(TalkbackKind.Close);
1355 } else if (!ended) {
1356 talkback(TalkbackKind.Pull);
1357 }
1358 })
1359 );
1360 } else {
1361 if (id) clearTimeout(id);
1362 id = setTimeout(() => {
1363 id = undefined;
1364 sink(signal);
1365 if (deferredEnded) sink(SignalKind.End);
1366 }, timing(signal[0]));
1367 }
1368 });
1369 };
1370}
1371
1372/** Delays each signal emitted by a Source by given time (ms).
1373 *
1374 * @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed.
1375 * @returns An {@link Operator}.
1376 *
1377 * @remarks
1378 * `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by.
1379 * This will create a timeout per received signal and delay the emitted values accordingly.
1380 *
1381 * Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to
1382 * be ordered. Otherwise, signals will arrive in the wrong order at the sink.
1383 */
1384export function delay<T>(wait: number): Operator<T, T> {
1385 return source => sink => {
1386 let active = 0;
1387 source(signal => {
1388 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
1389 sink(signal);
1390 } else {
1391 active++;
1392 setTimeout(() => {
1393 if (active) {
1394 active--;
1395 sink(signal);
1396 }
1397 }, wait);
1398 }
1399 });
1400 };
1401}
1402
1403/** Throttles a Source by omitting values that are emitted before a given timeout.
1404 *
1405 * @param timing - A function returning a throttle time (ms) per emitted value.
1406 * @returns An {@link Operator}.
1407 *
1408 * @remarks
1409 * `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted
1410 * value. During the returned timeframe all values issued by the {@link Source} will be omitted and
1411 * dropped.
1412 *
1413 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in
1414 * too frequently.
1415 *
1416 * @example
1417 * ```ts
1418 * pipe(
1419 * interval(50),
1420 * throttle(() => 100),
1421 * subscribe(x => {
1422 * // omits every second value: 0, 2, 4...
1423 * console.log(text);
1424 * })
1425 * );
1426 * ```
1427 */
1428export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
1429 return source => sink => {
1430 let skip = false;
1431 let id: any | void;
1432 source(signal => {
1433 if (signal === SignalKind.End) {
1434 if (id) clearTimeout(id);
1435 sink(SignalKind.End);
1436 } else if (signal.tag === SignalKind.Start) {
1437 const talkback = signal[0];
1438 sink(
1439 start(signal => {
1440 if (signal === TalkbackKind.Close) {
1441 if (id) clearTimeout(id);
1442 talkback(TalkbackKind.Close);
1443 } else {
1444 talkback(TalkbackKind.Pull);
1445 }
1446 })
1447 );
1448 } else if (!skip) {
1449 skip = true;
1450 if (id) clearTimeout(id);
1451 id = setTimeout(() => {
1452 id = undefined;
1453 skip = false;
1454 }, timing(signal[0]));
1455 sink(signal);
1456 }
1457 });
1458 };
1459}
1460
1461export { mergeAll as flatten, onPush as tap };