1import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3import { fromArray } from './sources';
4
5const identity = <T>(x: T): T => x;
6
7/** Buffers values and emits the array of bufferd values each time a `notifier` Source emits.
8 *
9 * @param notifier - A {@link Source} that releases the current buffer.
10 * @returns An {@link Operator}.
11 *
12 * @remarks
13 * `buffer` will buffer values from the input {@link Source}. When the passed `notifier` Source
14 * emits, it will emit an array of all buffered values.
15 *
16 * This can be used to group values over time. A buffer will only be emitted when it contains any
17 * values.
18 *
19 * @example
20 * ```ts
21 * pipe(
22 * interval(50),
23 * buffer(interval(100)),
24 * subscribe(x => {
25 * console.log(text); // logs: [0], [1, 2], [3, 4]...
26 * })
27 * );
28 * ```
29 */
30export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
31 return source => sink => {
32 let buffer: T[] = [];
33 let sourceTalkback = talkbackPlaceholder;
34 let notifierTalkback = talkbackPlaceholder;
35 let pulled = false;
36 let ended = false;
37 source(signal => {
38 if (ended) {
39 /*noop*/
40 } else if (signal === SignalKind.End) {
41 ended = true;
42 notifierTalkback(TalkbackKind.Close);
43 if (buffer.length) sink(push(buffer));
44 sink(SignalKind.End);
45 } else if (signal.tag === SignalKind.Start) {
46 sourceTalkback = signal[0];
47 notifier(signal => {
48 if (ended) {
49 /*noop*/
50 } else if (signal === SignalKind.End) {
51 ended = true;
52 sourceTalkback(TalkbackKind.Close);
53 if (buffer.length) sink(push(buffer));
54 sink(SignalKind.End);
55 } else if (signal.tag === SignalKind.Start) {
56 notifierTalkback = signal[0];
57 } else if (buffer.length) {
58 const signal = push(buffer);
59 buffer = [];
60 sink(signal);
61 }
62 });
63 } else {
64 buffer.push(signal[0]);
65 if (!pulled) {
66 pulled = true;
67 sourceTalkback(TalkbackKind.Pull);
68 notifierTalkback(TalkbackKind.Pull);
69 } else {
70 pulled = false;
71 }
72 }
73 });
74 sink(
75 start(signal => {
76 if (signal === TalkbackKind.Close && !ended) {
77 ended = true;
78 sourceTalkback(TalkbackKind.Close);
79 notifierTalkback(TalkbackKind.Close);
80 } else if (!ended && !pulled) {
81 pulled = true;
82 sourceTalkback(TalkbackKind.Pull);
83 notifierTalkback(TalkbackKind.Pull);
84 }
85 })
86 );
87 };
88}
89
90/** Emits in order from the Sources returned by a mapping function per value of the Source.
91 *
92 * @param map - A function returning a {@link Source} per value.
93 * @returns An {@link Operator}.
94 *
95 * @remarks
96 * `concatMap` accepts a mapping function which must return a {@link Source} per value.
97 * The output {@link Source} will emit values from each Source the function returned, in order,
98 * queuing sources that aren't yet active.
99 *
100 * This can be used to issue multiple values per emission of an input {@link Source}, while keeping
101 * the order of their outputs consistent.
102 *
103 * @example
104 * ```ts
105 * pipe(
106 * fromArray([1, 2]),
107 * concatMap(x => fromArray([x, x * 2])),
108 * subscribe(x => {
109 * console.log(text); // logs: 1, 2, 2, 4
110 * })
111 * );
112 * ```
113 */
114export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
115 return source => sink => {
116 const inputQueue: In[] = [];
117 let outerTalkback = talkbackPlaceholder;
118 let innerTalkback = talkbackPlaceholder;
119 let outerPulled = false;
120 let innerPulled = false;
121 let innerActive = false;
122 let ended = false;
123 function applyInnerSource(innerSource: Source<Out>): void {
124 innerActive = true;
125 innerSource(signal => {
126 if (signal === SignalKind.End) {
127 if (innerActive) {
128 innerActive = false;
129 if (inputQueue.length) {
130 applyInnerSource(map(inputQueue.shift()!));
131 } else if (ended) {
132 sink(SignalKind.End);
133 } else if (!outerPulled) {
134 outerPulled = true;
135 outerTalkback(TalkbackKind.Pull);
136 }
137 }
138 } else if (signal.tag === SignalKind.Start) {
139 innerPulled = false;
140 (innerTalkback = signal[0])(TalkbackKind.Pull);
141 } else if (innerActive) {
142 sink(signal);
143 if (innerPulled) {
144 innerPulled = false;
145 } else {
146 innerTalkback(TalkbackKind.Pull);
147 }
148 }
149 });
150 }
151 source(signal => {
152 if (ended) {
153 /*noop*/
154 } else if (signal === SignalKind.End) {
155 ended = true;
156 if (!innerActive && !inputQueue.length) sink(SignalKind.End);
157 } else if (signal.tag === SignalKind.Start) {
158 outerTalkback = signal[0];
159 } else {
160 outerPulled = false;
161 if (innerActive) {
162 inputQueue.push(signal[0]);
163 } else {
164 applyInnerSource(map(signal[0]));
165 }
166 }
167 });
168 sink(
169 start(signal => {
170 if (signal === TalkbackKind.Close) {
171 if (!ended) {
172 ended = true;
173 outerTalkback(TalkbackKind.Close);
174 }
175 if (innerActive) {
176 innerActive = false;
177 innerTalkback(TalkbackKind.Close);
178 }
179 } else {
180 if (!ended && !outerPulled) {
181 outerPulled = true;
182 outerTalkback(TalkbackKind.Pull);
183 }
184 if (innerActive && !innerPulled) {
185 innerPulled = true;
186 innerTalkback(TalkbackKind.Pull);
187 }
188 }
189 })
190 );
191 };
192}
193
194/** Flattens a Source emitting Sources into a single Source emitting the inner values in order.
195 *
196 * @see {@link concatMap} which this helper uses and instead accept a mapping function.
197 * @param source - An {@link Source} emitting {@link Source | Sources}.
198 * @returns A {@link Source} emitting values from the inner Sources.
199 *
200 * @remarks
201 * `concatAll` accepts a {@link Source} emitting {@link Source | Sources}.
202 * The output {@link Source} will emit values from each Source, in order, queuing sources that
203 * aren't yet active.
204 *
205 * @example
206 * ```ts
207 * pipe(
208 * fromArray([
209 * fromArray([1, 2]),
210 * fromArray([3, 4]),
211 * ]),
212 * concatAll,
213 * subscribe(x => {
214 * console.log(text); // logs: 1, 2, 3, 4
215 * })
216 * );
217 * ```
218 */
219export function concatAll<T>(source: Source<Source<T>>): Source<T> {
220 return concatMap<Source<T>, T>(identity)(source);
221}
222
223/** Emits values from the passed sources in order.
224 *
225 * @param sources - An array of {@link Source | Sources}.
226 * @returns A {@link Source} emitting values from the input Sources.
227 *
228 * @remarks
229 * `concat` accepts an array of {@link Source | Sources} and will emit values from them, starting
230 * with the first one and continuing to the next only when the prior source ended.
231 *
232 * This can be used to issue combine sources while keeping the order of their values intact.
233 *
234 * @example
235 * ```ts
236 * pipe(
237 * concat([
238 * fromArray([1, 2]),
239 * fromArray([3, 4]),
240 * ]),
241 * subscribe(x => {
242 * console.log(text); // logs: 1, 2, 3, 4
243 * })
244 * );
245 * ```
246 */
247export function concat<T>(sources: Source<T>[]): Source<T> {
248 return concatAll(fromArray(sources));
249}
250
251/** Filters out emitted values for which the passed predicate function returns `false`.
252 *
253 * @param predicate - A function returning a boolean per value.
254 * @returns An {@link Operator}.
255 *
256 * @remarks
257 * `filter` will omit values from the {@link Source} for which the passed `predicate` function
258 * returns `false`.
259 *
260 * @example
261 * ```ts
262 * pipe(
263 * fromArray([1, 2, 3]),
264 * filter(x => x % 2 === 0),
265 * subscribe(x => {
266 * console.log(text); // logs: 2
267 * })
268 * );
269 * ```
270 */
271export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
272 return source => sink => {
273 let talkback = talkbackPlaceholder;
274 source(signal => {
275 if (signal === SignalKind.End) {
276 sink(SignalKind.End);
277 } else if (signal.tag === SignalKind.Start) {
278 talkback = signal[0];
279 sink(signal);
280 } else if (!predicate(signal[0])) {
281 talkback(TalkbackKind.Pull);
282 } else {
283 sink(signal);
284 }
285 });
286 };
287}
288
289/** Maps emitted values using the passed mapping function.
290 *
291 * @param map - A function returning transforming the {@link Source | Source's} values.
292 * @returns An {@link Operator}.
293 *
294 * @remarks
295 * `map` accepts a transform function and calls it on each emitted value. It then emits
296 * the values returned by the transform function instead.
297 *
298 * @example
299 * ```ts
300 * pipe(
301 * fromArray([1, 2, 3]),
302 * map(x => x * 2),
303 * subscribe(x => {
304 * console.log(text); // logs: 2, 4, 6
305 * })
306 * );
307 * ```
308 */
309export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
310 return source => sink =>
311 source(signal => {
312 if (signal === SignalKind.End || signal.tag === SignalKind.Start) {
313 sink(signal);
314 } else {
315 sink(push(map(signal[0])));
316 }
317 });
318}
319
320/** Emits from the Sources returned by a mapping function per value of the Source.
321 *
322 * @param map - A function returning a {@link Source} per value.
323 * @returns An {@link Operator}.
324 *
325 * @remarks
326 * `mergeMap` accepts a mapping function which must return a {@link Source} per value.
327 * The output {@link Source} will emit values from all {@link Source | Sources} the mapping function
328 * returned.
329 *
330 * This can be used to issue multiple values per emission of an input {@link Source}, essentially
331 * multiplexing all values to multiple Sources.
332 *
333 * @example
334 * ```ts
335 * pipe(
336 * interval(50),
337 * mergeMap(x => pipe(
338 * fromValue(x),
339 * delay(100)
340 * )),
341 * subscribe(x => {
342 * console.log(text); // logs: 0, 1, 2...
343 * })
344 * );
345 * ```
346 */
347export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
348 return source => sink => {
349 let innerTalkbacks: TalkbackFn[] = [];
350 let outerTalkback = talkbackPlaceholder;
351 let outerPulled = false;
352 let ended = false;
353 function applyInnerSource(innerSource: Source<Out>): void {
354 let talkback = talkbackPlaceholder;
355 innerSource(signal => {
356 if (signal === SignalKind.End) {
357 if (innerTalkbacks.length) {
358 const index = innerTalkbacks.indexOf(talkback);
359 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1);
360 if (!innerTalkbacks.length) {
361 if (ended) {
362 sink(SignalKind.End);
363 } else if (!outerPulled) {
364 outerPulled = true;
365 outerTalkback(TalkbackKind.Pull);
366 }
367 }
368 }
369 } else if (signal.tag === SignalKind.Start) {
370 innerTalkbacks.push((talkback = signal[0]));
371 talkback(TalkbackKind.Pull);
372 } else if (innerTalkbacks.length) {
373 sink(signal);
374 talkback(TalkbackKind.Pull);
375 }
376 });
377 }
378 source(signal => {
379 if (ended) {
380 /*noop*/
381 } else if (signal === SignalKind.End) {
382 ended = true;
383 if (!innerTalkbacks.length) sink(SignalKind.End);
384 } else if (signal.tag === SignalKind.Start) {
385 outerTalkback = signal[0];
386 } else {
387 outerPulled = false;
388 applyInnerSource(map(signal[0]));
389 if (!outerPulled) {
390 outerPulled = true;
391 outerTalkback(TalkbackKind.Pull);
392 }
393 }
394 });
395 sink(
396 start(signal => {
397 if (signal === TalkbackKind.Close) {
398 if (!ended) {
399 ended = true;
400 outerTalkback(TalkbackKind.Close);
401 }
402 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
403 a[i](TalkbackKind.Close);
404 innerTalkbacks.length = 0;
405 } else {
406 if (!ended && !outerPulled) {
407 outerPulled = true;
408 outerTalkback(TalkbackKind.Pull);
409 } else {
410 outerPulled = false;
411 }
412 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
413 a[i](TalkbackKind.Pull);
414 }
415 })
416 );
417 };
418}
419
420/** Flattens a Source emitting Sources into a single Source emitting the inner values.
421 *
422 * @see {@link mergeMap} which this helper uses and instead accept a mapping function.
423 * @param source - An {@link Source} emitting {@link Source | Sources}.
424 * @returns A {@link Source} emitting values from the inner Sources.
425 *
426 * @remarks
427 * `mergeAll` accepts a {@link Source} which must emit {@link Source | Sources}. It will subscribe
428 * to each incoming source immediately and start passing its emitted values through.
429 *
430 * @example
431 * ```ts
432 * pipe(
433 * fromArray([
434 * interval(50),
435 * interval(100),
436 * ]),
437 * mergeAll,
438 * subscribe(x => {
439 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
440 * })
441 * );
442 * ```
443 */
444export function mergeAll<T>(source: Source<Source<T>>): Source<T> {
445 return mergeMap<Source<T>, T>(identity)(source);
446}
447
448/** Emits values from the passed sources simultaneously.
449 *
450 * @param sources - An array of {@link Source | Sources}.
451 * @returns A {@link Source} emitting values from the input Sources.
452 *
453 * @remarks
454 * `merge` accepts an array of {@link Source | Sources} and will subscribe to all of them, passing
455 * through all their emitted values simultaneously.
456 *
457 * This can be used to interleave the values of multiple sources.
458 *
459 * @example
460 * ```ts
461 * pipe(
462 * merge([
463 * interval(50),
464 * interval(100),
465 * ]),
466 * subscribe(x => {
467 * console.log(text); // logs: 0, 0, 1, 2, 1, 3, 4, 2
468 * })
469 * );
470 * ```
471 */
472export function merge<T>(sources: Source<T>[]): Source<T> {
473 return mergeAll(fromArray(sources));
474}
475
476/** Calls the passed callback function when the Source ends or is closed.
477 *
478 * @param callback - A function that is called when the {@link Source} ends.
479 * @returns An {@link Operator}.
480 *
481 * @remarks
482 * `onEnd` accepts a callback which is called when the {@link Source} either ends
483 * or is closed.
484 *
485 * This operator can be used to add side-effects to a Source.
486 *
487 * @example
488 * ```ts
489 * pipe(
490 * fromArray([1, 2, 3]),
491 * take(1),
492 * onEnd(() => {
493 * console.log('end');
494 * }),
495 * publish
496 * );
497 * ```
498 */
499export function onEnd<T>(callback: () => void): Operator<T, T> {
500 return source => sink => {
501 let ended = false;
502 source(signal => {
503 if (ended) {
504 /*noop*/
505 } else if (signal === SignalKind.End) {
506 ended = true;
507 sink(SignalKind.End);
508 callback();
509 } else if (signal.tag === SignalKind.Start) {
510 const talkback = signal[0];
511 sink(
512 start(signal => {
513 if (signal === TalkbackKind.Close) {
514 ended = true;
515 talkback(TalkbackKind.Close);
516 callback();
517 } else {
518 talkback(signal);
519 }
520 })
521 );
522 } else {
523 sink(signal);
524 }
525 });
526 };
527}
528
529/** Calls the passed callback function when the Source emits a value.
530 *
531 * @param callback - A function that is called with each value the {@link Source} emits.
532 * @returns An {@link Operator}.
533 *
534 * @remarks
535 * `onPush` accepts a callback which is called for every emitted value of
536 * the {@link Source}.
537 *
538 * This operator can be used to add side-effects to a Source.
539 *
540 * @example
541 * ```ts
542 * pipe(
543 * fromArray([1, 2, 3]),
544 * onPush(value => {
545 * console.log(value); // logs: 1, 2, 3
546 * }),
547 * publish
548 * );
549 * ```
550 */
551export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
552 return source => sink => {
553 let ended = false;
554 source(signal => {
555 if (ended) {
556 /*noop*/
557 } else if (signal === SignalKind.End) {
558 ended = true;
559 sink(SignalKind.End);
560 } else if (signal.tag === SignalKind.Start) {
561 const talkback = signal[0];
562 sink(
563 start(signal => {
564 if (signal === TalkbackKind.Close) ended = true;
565 talkback(signal);
566 })
567 );
568 } else {
569 callback(signal[0]);
570 sink(signal);
571 }
572 });
573 };
574}
575
576/** Calls the passed callback function when the Source starts.
577 *
578 * @param callback - A function that is called when the {@link Source} is started.
579 * @returns An {@link Operator}.
580 *
581 * @remarks
582 * `onPush` accepts a callback which is called for every emitted value of
583 * the {@link Source}.
584 *
585 * This operator can be used to add side-effects to a Source.
586 * Specifically, it's useful to add a side-effect for a Source that triggers only once
587 * the {@link Source} is used and started.
588 *
589 * @example
590 * ```ts
591 * pipe(
592 * fromArray([1, 2, 3]),
593 * onStart(() => {
594 * console.log('start');
595 * }),
596 * publish
597 * );
598 * ```
599 */
600export function onStart<T>(callback: () => void): Operator<T, T> {
601 return source => sink =>
602 source(signal => {
603 if (signal === SignalKind.End) {
604 sink(SignalKind.End);
605 } else if (signal.tag === SignalKind.Start) {
606 sink(signal);
607 callback();
608 } else {
609 sink(signal);
610 }
611 });
612}
613
614/** Emits the last value the {@link Source} emitted, whenever the notifier Source emits a value.
615 *
616 * @param notifier - A {@link Source} that triggers the last value to be emitted.
617 * @returns An {@link Operator}.
618 *
619 * @remarks
620 * `sample` will store the latest value the {@link Source} emitted. Every time the `notifier` Source
621 * emits, it will emit the latest value.
622 *
623 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in
624 * too frequently.
625 *
626 * {@link Source | Sources} emitting `undefined` are undefined behaviour and these values will be
627 * ignored.
628 *
629 * @example
630 * ```ts
631 * pipe(
632 * interval(50),
633 * sample(interval(100)),
634 * subscribe(x => {
635 * console.log(text); // logs: 0, 2, 4...
636 * })
637 * );
638 * ```
639 */
640export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
641 return source => sink => {
642 let sourceTalkback = talkbackPlaceholder;
643 let notifierTalkback = talkbackPlaceholder;
644 let value: T | void;
645 let pulled = false;
646 let ended = false;
647 source(signal => {
648 if (ended) {
649 /*noop*/
650 } else if (signal === SignalKind.End) {
651 ended = true;
652 notifierTalkback(TalkbackKind.Close);
653 sink(SignalKind.End);
654 } else if (signal.tag === SignalKind.Start) {
655 sourceTalkback = signal[0];
656 } else {
657 value = signal[0];
658 if (!pulled) {
659 pulled = true;
660 notifierTalkback(TalkbackKind.Pull);
661 sourceTalkback(TalkbackKind.Pull);
662 } else {
663 pulled = false;
664 }
665 }
666 });
667 notifier(signal => {
668 if (ended) {
669 /*noop*/
670 } else if (signal === SignalKind.End) {
671 ended = true;
672 sourceTalkback(TalkbackKind.Close);
673 sink(SignalKind.End);
674 } else if (signal.tag === SignalKind.Start) {
675 notifierTalkback = signal[0];
676 } else if (value !== undefined) {
677 const signal = push(value);
678 value = undefined;
679 sink(signal);
680 }
681 });
682 sink(
683 start(signal => {
684 if (signal === TalkbackKind.Close && !ended) {
685 ended = true;
686 sourceTalkback(TalkbackKind.Close);
687 notifierTalkback(TalkbackKind.Close);
688 } else if (!ended && !pulled) {
689 pulled = true;
690 sourceTalkback(TalkbackKind.Pull);
691 notifierTalkback(TalkbackKind.Pull);
692 }
693 })
694 );
695 };
696}
697
698/** Maps emitted values using the passed reducer function.
699 *
700 * @param reducer - A function called with the last value by the `reducer` and the emitted value.
701 * @param seed - The initial value that is passed to the `reducer`.
702 * @returns An {@link Operator}.
703 *
704 * @remarks
705 * `scan` accepts a reducer function and a seed value. The reducer will be called initially with the
706 * seed value and the first emitted value. The {@link Source} will then emit the value returned by
707 * the reducer function. Subsequently, the `reducer` is called with the last value the `reducer`
708 * returned and the emitted value.
709 *
710 * This operator is similar to `Array.prototype.reduce`, but instead is called over time and emits
711 * each value of the reducer.
712 *
713 * @example
714 * ```ts
715 * pipe(
716 * fromArray([1, 2, 3]),
717 * scan((acc, x) => acc + x, 0),
718 * subscribe(x => {
719 * console.log(text); // logs: 1, 3, 6
720 * })
721 * );
722 * ```
723 */
724export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
725 return source => sink => {
726 let acc = seed;
727 source(signal => {
728 if (signal === SignalKind.End) {
729 sink(SignalKind.End);
730 } else if (signal.tag === SignalKind.Start) {
731 sink(signal);
732 } else {
733 sink(push((acc = reducer(acc, signal[0]))));
734 }
735 });
736 };
737}
738
739/** Shares one underlying subscription to the Source between all Sinks.
740 *
741 * @param source - A {@link Source} that should be shared.
742 * @returns A shared {@link Source}.
743 *
744 * @remarks
745 * `share` accepts a {@link Source} and returns one. It will emit all values as normal, however, it
746 * will share one subscription to the input source. This allows side-effects on the input
747 * {@link Source} to only be triggerd once.
748 */
749export function share<T>(source: Source<T>): Source<T> {
750 let sinks: Sink<T>[] = [];
751 let talkback = talkbackPlaceholder;
752 let gotSignal = false;
753 return sink => {
754 sinks.push(sink);
755 if (sinks.length === 1) {
756 source(signal => {
757 if (signal === SignalKind.End) {
758 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
759 sinks.length = 0;
760 } else if (signal.tag === SignalKind.Start) {
761 talkback = signal[0];
762 } else {
763 gotSignal = false;
764 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
765 }
766 });
767 }
768 sink(
769 start(signal => {
770 if (signal === TalkbackKind.Close) {
771 const index = sinks.indexOf(sink);
772 if (index > -1) (sinks = sinks.slice()).splice(index, 1);
773 if (!sinks.length) talkback(TalkbackKind.Close);
774 } else if (!gotSignal) {
775 gotSignal = true;
776 talkback(TalkbackKind.Pull);
777 }
778 })
779 );
780 };
781}
782
783/** Omits `wait` amount of values from the Source and then runs as usual.
784 *
785 * @param wait - The number of values to be omitted.
786 * @returns An {@link Operator}.
787 *
788 * @remarks
789 * `skip` will skip `wait` number of emitted values, then issue all values as normal afterwards.
790 * This essentially skips a given number of values on the input {@link Source}.
791 *
792 * @example
793 * ```ts
794 * pipe(
795 * fromArray([1, 2, 3]),
796 * skip(2),
797 * subscribe(x => {
798 * console.log(text); // logs: 3
799 * })
800 * );
801 * ```
802 */
803export function skip<T>(wait: number): Operator<T, T> {
804 return source => sink => {
805 let talkback = talkbackPlaceholder;
806 let rest = wait;
807 source(signal => {
808 if (signal === SignalKind.End) {
809 sink(SignalKind.End);
810 } else if (signal.tag === SignalKind.Start) {
811 talkback = signal[0];
812 sink(signal);
813 } else if (rest-- > 0) {
814 talkback(TalkbackKind.Pull);
815 } else {
816 sink(signal);
817 }
818 });
819 };
820}
821
822/** Omits values from an input Source until a notifier Source emits a value.
823 *
824 * @param notifier - A {@link Source} that starts the operator's sent values.
825 * @returns An {@link Operator}.
826 *
827 * @remarks
828 * `skipUntil` will omit all values from the input {@link Source} until the `notifier`
829 * Source emits a value of its own. It'll then start passing values from the Source through.
830 *
831 * @example
832 * ```ts
833 * pipe(
834 * interval(50),
835 * skipUntil(interval(150)),
836 * subscribe(x => {
837 * console.log(text); // logs: 2, 3...
838 * })
839 * );
840 * ```
841 */
842export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
843 return source => sink => {
844 let sourceTalkback = talkbackPlaceholder;
845 let notifierTalkback = talkbackPlaceholder;
846 let skip = true;
847 let pulled = false;
848 let ended = false;
849 source(signal => {
850 if (ended) {
851 /*noop*/
852 } else if (signal === SignalKind.End) {
853 ended = true;
854 if (skip) notifierTalkback(TalkbackKind.Close);
855 sink(SignalKind.End);
856 } else if (signal.tag === SignalKind.Start) {
857 sourceTalkback = signal[0];
858 notifier(signal => {
859 if (signal === SignalKind.End) {
860 if (skip) {
861 ended = true;
862 sourceTalkback(TalkbackKind.Close);
863 }
864 } else if (signal.tag === SignalKind.Start) {
865 (notifierTalkback = signal[0])(TalkbackKind.Pull);
866 } else {
867 skip = false;
868 notifierTalkback(TalkbackKind.Close);
869 }
870 });
871 } else if (!skip) {
872 pulled = false;
873 sink(signal);
874 } else if (!pulled) {
875 pulled = true;
876 sourceTalkback(TalkbackKind.Pull);
877 notifierTalkback(TalkbackKind.Pull);
878 } else {
879 pulled = false;
880 }
881 });
882 sink(
883 start(signal => {
884 if (signal === TalkbackKind.Close && !ended) {
885 ended = true;
886 sourceTalkback(TalkbackKind.Close);
887 if (skip) notifierTalkback(TalkbackKind.Close);
888 } else if (!ended && !pulled) {
889 pulled = true;
890 if (skip) notifierTalkback(TalkbackKind.Pull);
891 sourceTalkback(TalkbackKind.Pull);
892 }
893 })
894 );
895 };
896}
897
898/** Omits values from an input Source until a predicate function returns `false`.
899 *
900 * @param predicate - A function returning a boolean per value.
901 * @returns An {@link Operator}.
902 *
903 * @remarks
904 * `skipWhile` will omit all values from the input {@link Source} until the `predicate`
905 * function returns `false`. When the `predicate` function returns `false`, the Source's values will
906 * be passed through.
907 *
908 * @example
909 * ```ts
910 * pipe(
911 * fromArray([1, 2, 3]),
912 * skipWhile(x => x < 2),
913 * subscribe(x => {
914 * console.log(text); // logs: 2, 3
915 * })
916 * );
917 * ```
918 */
919export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
920 return source => sink => {
921 let talkback = talkbackPlaceholder;
922 let skip = true;
923 source(signal => {
924 if (signal === SignalKind.End) {
925 sink(SignalKind.End);
926 } else if (signal.tag === SignalKind.Start) {
927 talkback = signal[0];
928 sink(signal);
929 } else if (skip) {
930 if (predicate(signal[0])) {
931 talkback(TalkbackKind.Pull);
932 } else {
933 skip = false;
934 sink(signal);
935 }
936 } else {
937 sink(signal);
938 }
939 });
940 };
941}
942
943/** Emits from the latest Source returned by a mapping function per value of the Source.
944 *
945 * @param map - A function returning a {@link Source} per value.
946 * @returns An {@link Operator}.
947 *
948 * @remarks
949 * `switchMap` accepts a mapping function which must return a {@link Source} per value.
950 * The output {@link Source} will emit values from the latest Source the mapping function
951 * returned. If a value is emitted while the last returned Source is still active, the prior Source
952 * will be closed.
953 *
954 * This can be used to issue multiple values per emission of an input {@link Source}, while only
955 * letting one of these sub-Sources be active at a time.
956 *
957 * @example
958 * ```ts
959 * pipe(
960 * interval(100),
961 * switchMap(() => interval(50)),
962 * subscribe(x => {
963 * console.log(text); // logs: 0, 0, 0...
964 * })
965 * );
966 * ```
967 */
968export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
969 return source => sink => {
970 let outerTalkback = talkbackPlaceholder;
971 let innerTalkback = talkbackPlaceholder;
972 let outerPulled = false;
973 let innerPulled = false;
974 let innerActive = false;
975 let ended = false;
976 function applyInnerSource(innerSource: Source<Out>): void {
977 innerActive = true;
978 innerSource(signal => {
979 if (!innerActive) {
980 /*noop*/
981 } else if (signal === SignalKind.End) {
982 innerActive = false;
983 if (ended) {
984 sink(SignalKind.End);
985 } else if (!outerPulled) {
986 outerPulled = true;
987 outerTalkback(TalkbackKind.Pull);
988 }
989 } else if (signal.tag === SignalKind.Start) {
990 innerPulled = false;
991 (innerTalkback = signal[0])(TalkbackKind.Pull);
992 } else {
993 sink(signal);
994 if (!innerPulled) {
995 innerTalkback(TalkbackKind.Pull);
996 } else {
997 innerPulled = false;
998 }
999 }
1000 });
1001 }
1002 source(signal => {
1003 if (ended) {
1004 /*noop*/
1005 } else if (signal === SignalKind.End) {
1006 ended = true;
1007 if (!innerActive) sink(SignalKind.End);
1008 } else if (signal.tag === SignalKind.Start) {
1009 outerTalkback = signal[0];
1010 } else {
1011 if (innerActive) {
1012 innerTalkback(TalkbackKind.Close);
1013 innerTalkback = talkbackPlaceholder;
1014 }
1015 if (!outerPulled) {
1016 outerPulled = true;
1017 outerTalkback(TalkbackKind.Pull);
1018 } else {
1019 outerPulled = false;
1020 }
1021 applyInnerSource(map(signal[0]));
1022 }
1023 });
1024 sink(
1025 start(signal => {
1026 if (signal === TalkbackKind.Close) {
1027 if (!ended) {
1028 ended = true;
1029 outerTalkback(TalkbackKind.Close);
1030 }
1031 if (innerActive) {
1032 innerActive = false;
1033 innerTalkback(TalkbackKind.Close);
1034 }
1035 } else {
1036 if (!ended && !outerPulled) {
1037 outerPulled = true;
1038 outerTalkback(TalkbackKind.Pull);
1039 }
1040 if (innerActive && !innerPulled) {
1041 innerPulled = true;
1042 innerTalkback(TalkbackKind.Pull);
1043 }
1044 }
1045 })
1046 );
1047 };
1048}
1049
1050/** Flattens a Source emitting Sources into a single Source emitting the inner values.
1051 *
1052 * @see {@link switchMap} which this helper uses and instead accept a mapping function.
1053 * @param source - An {@link Source} emitting {@link Source | Sources}.
1054 * @returns A {@link Source} emitting values from the inner Sources.
1055 *
1056 * @remarks
1057 * `switchAll` accepts a {@link Source} which must emit {@link Source | Sources}. Each time it
1058 * receives a {@link Source} it will close its prior subscription and subscribe to the new Source
1059 * instead, passing through its values.
1060 *
1061 * @example
1062 * ```ts
1063 * pipe(
1064 * interval(100),
1065 * map(() => interval(50)),
1066 * switchAll,
1067 * subscribe(x => {
1068 * console.log(text); // logs: 0, 0, 0...
1069 * })
1070 * );
1071 * ```
1072 */
1073export function switchAll<T>(source: Source<Source<T>>): Source<T> {
1074 return switchMap<Source<T>, T>(identity)(source);
1075}
1076
1077/** Emits `max` values from the Source and then ends.
1078 *
1079 * @param max - The maximum number of values emitted.
1080 * @returns An {@link Operator}.
1081 *
1082 * @remarks
1083 * `take` will issue all values as normal until the `max` number of emitted values has been reached.
1084 * It will then end and close the {@link Source}.
1085 *
1086 * @example
1087 * ```ts
1088 * pipe(
1089 * fromArray([1, 2, 3]),
1090 * take(2),
1091 * subscribe(x => {
1092 * console.log(text); // logs: 1, 2
1093 * })
1094 * );
1095 * ```
1096 */
1097export function take<T>(max: number): Operator<T, T> {
1098 return source => sink => {
1099 let talkback = talkbackPlaceholder;
1100 let ended = false;
1101 let taken = 0;
1102 source(signal => {
1103 if (ended) {
1104 /*noop*/
1105 } else if (signal === SignalKind.End) {
1106 ended = true;
1107 sink(SignalKind.End);
1108 } else if (signal.tag === SignalKind.Start) {
1109 if (max <= 0) {
1110 ended = true;
1111 sink(SignalKind.End);
1112 signal[0](TalkbackKind.Close);
1113 } else {
1114 talkback = signal[0];
1115 }
1116 } else if (taken++ < max) {
1117 sink(signal);
1118 if (!ended && taken >= max) {
1119 ended = true;
1120 sink(SignalKind.End);
1121 talkback(TalkbackKind.Close);
1122 }
1123 } else {
1124 sink(signal);
1125 }
1126 });
1127 sink(
1128 start(signal => {
1129 if (signal === TalkbackKind.Close && !ended) {
1130 ended = true;
1131 talkback(TalkbackKind.Close);
1132 } else if (signal === TalkbackKind.Pull && !ended && taken < max) {
1133 talkback(TalkbackKind.Pull);
1134 }
1135 })
1136 );
1137 };
1138}
1139
1140/** Buffers the `max` last values of the Source and emits them once the Source ends.
1141 *
1142 * @param max - The maximum number of values buffered.
1143 * @returns An {@link Operator}.
1144 *
1145 * @remarks
1146 * `takeLast` will buffer values from the input {@link Source} up until the given `max` number. It
1147 * will only emit values stored in the buffer once the {@link Source} ends.
1148 *
1149 * All values in the buffer are emitted like the {@link fromArray | `fromArray`} source would
1150 * synchronously.
1151 *
1152 * @example
1153 * ```ts
1154 * pipe(
1155 * fromArray([1, 2, 3]),
1156 * takeLast(1),
1157 * subscribe(x => {
1158 * console.log(text); // logs: 3
1159 * })
1160 * );
1161 * ```
1162 */
1163export function takeLast<T>(max: number): Operator<T, T> {
1164 return source => sink => {
1165 const queue: T[] = [];
1166 let talkback = talkbackPlaceholder;
1167 source(signal => {
1168 if (signal === SignalKind.End) {
1169 fromArray(queue)(sink);
1170 } else if (signal.tag === SignalKind.Start) {
1171 if (max <= 0) {
1172 signal[0](TalkbackKind.Close);
1173 fromArray(queue)(sink);
1174 } else {
1175 (talkback = signal[0])(TalkbackKind.Pull);
1176 }
1177 } else {
1178 if (queue.length >= max && max) queue.shift();
1179 queue.push(signal[0]);
1180 talkback(TalkbackKind.Pull);
1181 }
1182 });
1183 };
1184}
1185
1186/** Takes values from an input Source until a notifier Source emits a value.
1187 *
1188 * @param notifier - A {@link Source} that stops the operator's sent values.
1189 * @returns An {@link Operator}.
1190 *
1191 * @remarks
1192 * `takeUntil` will issue all values as normal from the input {@link Source} until the `notifier`
1193 * Source emits a value of its own. It'll then close the {@link Source}.
1194 *
1195 * @example
1196 * ```ts
1197 * pipe(
1198 * interval(50),
1199 * takeUntil(interval(150)),
1200 * subscribe(x => {
1201 * console.log(text); // logs: 0, 1
1202 * })
1203 * );
1204 * ```
1205 */
1206export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
1207 return source => sink => {
1208 let sourceTalkback = talkbackPlaceholder;
1209 let notifierTalkback = talkbackPlaceholder;
1210 let ended = false;
1211 source(signal => {
1212 if (ended) {
1213 /*noop*/
1214 } else if (signal === SignalKind.End) {
1215 ended = true;
1216 notifierTalkback(TalkbackKind.Close);
1217 sink(SignalKind.End);
1218 } else if (signal.tag === SignalKind.Start) {
1219 sourceTalkback = signal[0];
1220 notifier(signal => {
1221 if (signal === SignalKind.End) {
1222 /*noop*/
1223 } else if (signal.tag === SignalKind.Start) {
1224 (notifierTalkback = signal[0])(TalkbackKind.Pull);
1225 } else {
1226 ended = true;
1227 notifierTalkback(TalkbackKind.Close);
1228 sourceTalkback(TalkbackKind.Close);
1229 sink(SignalKind.End);
1230 }
1231 });
1232 } else {
1233 sink(signal);
1234 }
1235 });
1236 sink(
1237 start(signal => {
1238 if (signal === TalkbackKind.Close && !ended) {
1239 ended = true;
1240 sourceTalkback(TalkbackKind.Close);
1241 notifierTalkback(TalkbackKind.Close);
1242 } else if (!ended) {
1243 sourceTalkback(TalkbackKind.Pull);
1244 }
1245 })
1246 );
1247 };
1248}
1249
1250/** Takes values from an input Source until a predicate function returns `false`.
1251 *
1252 * @param predicate - A function returning a boolean per value.
1253 * @returns An {@link Operator}.
1254 *
1255 * @remarks
1256 * `takeWhile` will issue all values as normal from the input {@link Source} until the `predicate`
1257 * function returns `false`. When the `predicate` function returns `false`, the current value is
1258 * omitted and the {@link Source} is closed.
1259 *
1260 * @example
1261 * ```ts
1262 * pipe(
1263 * fromArray([1, 2, 3]),
1264 * takeWhile(x => x < 2),
1265 * subscribe(x => {
1266 * console.log(text); // logs: 1
1267 * })
1268 * );
1269 * ```
1270 */
1271export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
1272 return source => sink => {
1273 let talkback = talkbackPlaceholder;
1274 let ended = false;
1275 source(signal => {
1276 if (ended) {
1277 /*noop*/
1278 } else if (signal === SignalKind.End) {
1279 ended = true;
1280 sink(SignalKind.End);
1281 } else if (signal.tag === SignalKind.Start) {
1282 talkback = signal[0];
1283 sink(signal);
1284 } else if (!predicate(signal[0])) {
1285 ended = true;
1286 sink(SignalKind.End);
1287 talkback(TalkbackKind.Close);
1288 } else {
1289 sink(signal);
1290 }
1291 });
1292 };
1293}
1294
1295/** Debounces a Source by omitting values until a given timeframe has passed.
1296 *
1297 * @param timing - A function returning a debounce time (ms) per emitted value.
1298 * @returns An {@link Operator}.
1299 *
1300 * @remarks
1301 * `debounce` accepts a mapping function that can be used to return a time (in ms) per emitted
1302 * value. All emitted values issued by the {@link Source} during the returned time will be omitted
1303 * until the time has passed.
1304 *
1305 * Debouncing means that the returned {@link Source} will wait for a minimum time of silence until a
1306 * value is let through.
1307 *
1308 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in
1309 * too frequently.
1310 *
1311 * @example
1312 * ```ts
1313 * pipe(
1314 * interval(50),
1315 * debounce(() => 100),
1316 * subscribe(x => {
1317 * console.log(text); // never logs any value
1318 * })
1319 * );
1320 * ```
1321 */
1322export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
1323 return source => sink => {
1324 let id: any | void;
1325 let deferredEnded = false;
1326 let ended = false;
1327 source(signal => {
1328 if (ended) {
1329 /*noop*/
1330 } else if (signal === SignalKind.End) {
1331 ended = true;
1332 if (id) {
1333 deferredEnded = true;
1334 } else {
1335 sink(SignalKind.End);
1336 }
1337 } else if (signal.tag === SignalKind.Start) {
1338 const talkback = signal[0];
1339 sink(
1340 start(signal => {
1341 if (signal === TalkbackKind.Close && !ended) {
1342 ended = true;
1343 deferredEnded = false;
1344 if (id) clearTimeout(id);
1345 talkback(TalkbackKind.Close);
1346 } else if (!ended) {
1347 talkback(TalkbackKind.Pull);
1348 }
1349 })
1350 );
1351 } else {
1352 if (id) clearTimeout(id);
1353 id = setTimeout(() => {
1354 id = undefined;
1355 sink(signal);
1356 if (deferredEnded) sink(SignalKind.End);
1357 }, timing(signal[0]));
1358 }
1359 });
1360 };
1361}
1362
1363/** Delays each signal emitted by a Source by given time (ms).
1364 *
1365 * @param wait - A time (in ms) by which each {@link SignalKind | signal} is delayed.
1366 * @returns An {@link Operator}.
1367 *
1368 * @remarks
1369 * `delay` accepts a time (in ms) by which each {@link SignalKind | signal} will be delayed by.
1370 * This will create a timeout per received signal and delay the emitted values accordingly.
1371 *
1372 * Since the operator only calls `setTimeout` per signal, it relies on the timeout implementation to
1373 * be ordered. Otherwise, signals will arrive in the wrong order at the sink.
1374 */
1375export function delay<T>(wait: number): Operator<T, T> {
1376 return source => sink => {
1377 let active = 0;
1378 source(signal => {
1379 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
1380 sink(signal);
1381 } else {
1382 active++;
1383 setTimeout(() => {
1384 if (active) {
1385 active--;
1386 sink(signal);
1387 }
1388 }, wait);
1389 }
1390 });
1391 };
1392}
1393
1394/** Throttles a Source by omitting values that are emitted before a given timeout.
1395 *
1396 * @param timing - A function returning a throttle time (ms) per emitted value.
1397 * @returns An {@link Operator}.
1398 *
1399 * @remarks
1400 * `throttle` accepts a mapping function that can be used to return a time (in ms) per emitted
1401 * value. During the returned timeframe all values issued by the {@link Source} will be omitted and
1402 * dropped.
1403 *
1404 * This is a back pressure operator that can be used to omit values from a {@link Source} coming in
1405 * too frequently.
1406 *
1407 * @example
1408 * ```ts
1409 * pipe(
1410 * interval(50),
1411 * throttle(() => 100),
1412 * subscribe(x => {
1413 * // omits every second value: 0, 2, 4...
1414 * console.log(text);
1415 * })
1416 * );
1417 * ```
1418 */
1419export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
1420 return source => sink => {
1421 let skip = false;
1422 let id: any | void;
1423 source(signal => {
1424 if (signal === SignalKind.End) {
1425 if (id) clearTimeout(id);
1426 sink(SignalKind.End);
1427 } else if (signal.tag === SignalKind.Start) {
1428 const talkback = signal[0];
1429 sink(
1430 start(signal => {
1431 if (signal === TalkbackKind.Close) {
1432 if (id) clearTimeout(id);
1433 talkback(TalkbackKind.Close);
1434 } else {
1435 talkback(TalkbackKind.Pull);
1436 }
1437 })
1438 );
1439 } else if (!skip) {
1440 skip = true;
1441 if (id) clearTimeout(id);
1442 id = setTimeout(() => {
1443 id = undefined;
1444 skip = false;
1445 }, timing(signal[0]));
1446 sink(signal);
1447 }
1448 });
1449 };
1450}
1451
1452export { mergeAll as flatten, onPush as tap };