1import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3import { fromArray } from './sources';
4
5const identity = <T>(x: T): T => x;
6
7export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
8 return source => sink => {
9 let buffer: T[] = [];
10 let sourceTalkback = talkbackPlaceholder;
11 let notifierTalkback = talkbackPlaceholder;
12 let pulled = false;
13 let ended = false;
14 source(signal => {
15 if (ended) {
16 /*noop*/
17 } else if (signal === SignalKind.End) {
18 ended = true;
19 notifierTalkback(TalkbackKind.Close);
20 if (buffer.length) sink(push(buffer));
21 sink(SignalKind.End);
22 } else if (signal.tag === SignalKind.Start) {
23 sourceTalkback = signal[0];
24 notifier(signal => {
25 if (ended) {
26 /*noop*/
27 } else if (signal === SignalKind.End) {
28 ended = true;
29 sourceTalkback(TalkbackKind.Close);
30 if (buffer.length) sink(push(buffer));
31 sink(SignalKind.End);
32 } else if (signal.tag === SignalKind.Start) {
33 notifierTalkback = signal[0];
34 } else if (buffer.length) {
35 const signal = push(buffer);
36 buffer = [];
37 sink(signal);
38 }
39 });
40 } else {
41 buffer.push(signal[0]);
42 if (!pulled) {
43 pulled = true;
44 sourceTalkback(TalkbackKind.Pull);
45 notifierTalkback(TalkbackKind.Pull);
46 } else {
47 pulled = false;
48 }
49 }
50 });
51 sink(
52 start(signal => {
53 if (signal === TalkbackKind.Close && !ended) {
54 ended = true;
55 sourceTalkback(TalkbackKind.Close);
56 notifierTalkback(TalkbackKind.Close);
57 } else if (!ended && !pulled) {
58 pulled = true;
59 sourceTalkback(TalkbackKind.Pull);
60 notifierTalkback(TalkbackKind.Pull);
61 }
62 })
63 );
64 };
65}
66
67export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
68 return source => sink => {
69 const inputQueue: In[] = [];
70 let outerTalkback = talkbackPlaceholder;
71 let innerTalkback = talkbackPlaceholder;
72 let outerPulled = false;
73 let innerPulled = false;
74 let innerActive = false;
75 let ended = false;
76 function applyInnerSource(innerSource: Source<Out>): void {
77 innerActive = true;
78 innerSource(signal => {
79 if (signal === SignalKind.End) {
80 if (innerActive) {
81 innerActive = false;
82 if (inputQueue.length) {
83 applyInnerSource(map(inputQueue.shift()!));
84 } else if (ended) {
85 sink(SignalKind.End);
86 } else if (!outerPulled) {
87 outerPulled = true;
88 outerTalkback(TalkbackKind.Pull);
89 }
90 }
91 } else if (signal.tag === SignalKind.Start) {
92 innerPulled = false;
93 (innerTalkback = signal[0])(TalkbackKind.Pull);
94 } else if (innerActive) {
95 sink(signal);
96 if (innerPulled) {
97 innerPulled = false;
98 } else {
99 innerTalkback(TalkbackKind.Pull);
100 }
101 }
102 });
103 }
104 source(signal => {
105 if (ended) {
106 /*noop*/
107 } else if (signal === SignalKind.End) {
108 ended = true;
109 if (!innerActive && !inputQueue.length) sink(SignalKind.End);
110 } else if (signal.tag === SignalKind.Start) {
111 outerTalkback = signal[0];
112 } else {
113 outerPulled = false;
114 if (innerActive) {
115 inputQueue.push(signal[0]);
116 } else {
117 applyInnerSource(map(signal[0]));
118 }
119 }
120 });
121 sink(
122 start(signal => {
123 if (signal === TalkbackKind.Close) {
124 if (!ended) {
125 ended = true;
126 outerTalkback(TalkbackKind.Close);
127 }
128 if (innerActive) {
129 innerActive = false;
130 innerTalkback(TalkbackKind.Close);
131 }
132 } else {
133 if (!ended && !outerPulled) {
134 outerPulled = true;
135 outerTalkback(TalkbackKind.Pull);
136 }
137 if (innerActive && !innerPulled) {
138 innerPulled = true;
139 innerTalkback(TalkbackKind.Pull);
140 }
141 }
142 })
143 );
144 };
145}
146
147export function concatAll<T>(source: Source<Source<T>>): Source<T> {
148 return concatMap<Source<T>, T>(identity)(source);
149}
150
151export function concat<T>(sources: Source<T>[]): Source<T> {
152 return concatAll(fromArray(sources));
153}
154
155export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
156 return source => sink => {
157 let talkback = talkbackPlaceholder;
158 source(signal => {
159 if (signal === SignalKind.End) {
160 sink(SignalKind.End);
161 } else if (signal.tag === SignalKind.Start) {
162 talkback = signal[0];
163 sink(signal);
164 } else if (!predicate(signal[0])) {
165 talkback(TalkbackKind.Pull);
166 } else {
167 sink(signal);
168 }
169 });
170 };
171}
172
173export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
174 return source => sink =>
175 source(signal => {
176 if (signal === SignalKind.End || signal.tag === SignalKind.Start) {
177 sink(signal);
178 } else {
179 sink(push(map(signal[0])));
180 }
181 });
182}
183
184export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
185 return source => sink => {
186 let innerTalkbacks: TalkbackFn[] = [];
187 let outerTalkback = talkbackPlaceholder;
188 let outerPulled = false;
189 let ended = false;
190 function applyInnerSource(innerSource: Source<Out>): void {
191 let talkback = talkbackPlaceholder;
192 innerSource(signal => {
193 if (signal === SignalKind.End) {
194 if (innerTalkbacks.length) {
195 const index = innerTalkbacks.indexOf(talkback);
196 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1);
197 if (!innerTalkbacks.length) {
198 if (ended) {
199 sink(SignalKind.End);
200 } else if (!outerPulled) {
201 outerPulled = true;
202 outerTalkback(TalkbackKind.Pull);
203 }
204 }
205 }
206 } else if (signal.tag === SignalKind.Start) {
207 innerTalkbacks.push((talkback = signal[0]));
208 talkback(TalkbackKind.Pull);
209 } else if (innerTalkbacks.length) {
210 sink(signal);
211 talkback(TalkbackKind.Pull);
212 }
213 });
214 }
215 source(signal => {
216 if (ended) {
217 /*noop*/
218 } else if (signal === SignalKind.End) {
219 ended = true;
220 if (!innerTalkbacks.length) sink(SignalKind.End);
221 } else if (signal.tag === SignalKind.Start) {
222 outerTalkback = signal[0];
223 } else {
224 outerPulled = false;
225 applyInnerSource(map(signal[0]));
226 if (!outerPulled) {
227 outerPulled = true;
228 outerTalkback(TalkbackKind.Pull);
229 }
230 }
231 });
232 sink(
233 start(signal => {
234 if (signal === TalkbackKind.Close) {
235 if (!ended) {
236 ended = true;
237 outerTalkback(TalkbackKind.Close);
238 }
239 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
240 a[i](TalkbackKind.Close);
241 innerTalkbacks.length = 0;
242 } else {
243 if (!ended && !outerPulled) {
244 outerPulled = true;
245 outerTalkback(TalkbackKind.Pull);
246 } else {
247 outerPulled = false;
248 }
249 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
250 a[i](TalkbackKind.Pull);
251 }
252 })
253 );
254 };
255}
256
257export function mergeAll<T>(source: Source<Source<T>>): Source<T> {
258 return mergeMap<Source<T>, T>(identity)(source);
259}
260
261export function merge<T>(sources: Source<T>[]): Source<T> {
262 return mergeAll(fromArray(sources));
263}
264
265export function onEnd<T>(callback: () => void): Operator<T, T> {
266 return source => sink => {
267 let ended = false;
268 source(signal => {
269 if (ended) {
270 /*noop*/
271 } else if (signal === SignalKind.End) {
272 ended = true;
273 sink(SignalKind.End);
274 callback();
275 } else if (signal.tag === SignalKind.Start) {
276 const talkback = signal[0];
277 sink(
278 start(signal => {
279 if (signal === TalkbackKind.Close) {
280 ended = true;
281 talkback(TalkbackKind.Close);
282 callback();
283 } else {
284 talkback(signal);
285 }
286 })
287 );
288 } else {
289 sink(signal);
290 }
291 });
292 };
293}
294
295export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
296 return source => sink => {
297 let ended = false;
298 source(signal => {
299 if (ended) {
300 /*noop*/
301 } else if (signal === SignalKind.End) {
302 ended = true;
303 sink(SignalKind.End);
304 } else if (signal.tag === SignalKind.Start) {
305 const talkback = signal[0];
306 sink(
307 start(signal => {
308 if (signal === TalkbackKind.Close) ended = true;
309 talkback(signal);
310 })
311 );
312 } else {
313 callback(signal[0]);
314 sink(signal);
315 }
316 });
317 };
318}
319
320export function onStart<T>(callback: () => void): Operator<T, T> {
321 return source => sink =>
322 source(signal => {
323 if (signal === SignalKind.End) {
324 sink(SignalKind.End);
325 } else if (signal.tag === SignalKind.Start) {
326 sink(signal);
327 callback();
328 } else {
329 sink(signal);
330 }
331 });
332}
333
334export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
335 return source => sink => {
336 let sourceTalkback = talkbackPlaceholder;
337 let notifierTalkback = talkbackPlaceholder;
338 let value: T | void;
339 let pulled = false;
340 let ended = false;
341 source(signal => {
342 if (ended) {
343 /*noop*/
344 } else if (signal === SignalKind.End) {
345 ended = true;
346 notifierTalkback(TalkbackKind.Close);
347 sink(SignalKind.End);
348 } else if (signal.tag === SignalKind.Start) {
349 sourceTalkback = signal[0];
350 } else {
351 value = signal[0];
352 if (!pulled) {
353 pulled = true;
354 notifierTalkback(TalkbackKind.Pull);
355 sourceTalkback(TalkbackKind.Pull);
356 } else {
357 pulled = false;
358 }
359 }
360 });
361 notifier(signal => {
362 if (ended) {
363 /*noop*/
364 } else if (signal === SignalKind.End) {
365 ended = true;
366 sourceTalkback(TalkbackKind.Close);
367 sink(SignalKind.End);
368 } else if (signal.tag === SignalKind.Start) {
369 notifierTalkback = signal[0];
370 } else if (value !== undefined) {
371 const signal = push(value);
372 value = undefined;
373 sink(signal);
374 }
375 });
376 sink(
377 start(signal => {
378 if (signal === TalkbackKind.Close && !ended) {
379 ended = true;
380 sourceTalkback(TalkbackKind.Close);
381 notifierTalkback(TalkbackKind.Close);
382 } else if (!ended && !pulled) {
383 pulled = true;
384 sourceTalkback(TalkbackKind.Pull);
385 notifierTalkback(TalkbackKind.Pull);
386 }
387 })
388 );
389 };
390}
391
392export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
393 return source => sink => {
394 let acc = seed;
395 source(signal => {
396 if (signal === SignalKind.End) {
397 sink(SignalKind.End);
398 } else if (signal.tag === SignalKind.Start) {
399 sink(signal);
400 } else {
401 sink(push((acc = reducer(acc, signal[0]))));
402 }
403 });
404 };
405}
406
407export function share<T>(source: Source<T>): Source<T> {
408 let sinks: Sink<T>[] = [];
409 let talkback = talkbackPlaceholder;
410 let gotSignal = false;
411 return sink => {
412 sinks.push(sink);
413 if (sinks.length === 1) {
414 source(signal => {
415 if (signal === SignalKind.End) {
416 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
417 sinks.length = 0;
418 } else if (signal.tag === SignalKind.Start) {
419 talkback = signal[0];
420 } else {
421 gotSignal = false;
422 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
423 }
424 });
425 }
426 sink(
427 start(signal => {
428 if (signal === TalkbackKind.Close) {
429 const index = sinks.indexOf(sink);
430 if (index > -1) (sinks = sinks.slice()).splice(index, 1);
431 if (!sinks.length) talkback(TalkbackKind.Close);
432 } else if (!gotSignal) {
433 gotSignal = true;
434 talkback(TalkbackKind.Pull);
435 }
436 })
437 );
438 };
439}
440
441export function skip<T>(wait: number): Operator<T, T> {
442 return source => sink => {
443 let talkback = talkbackPlaceholder;
444 let rest = wait;
445 source(signal => {
446 if (signal === SignalKind.End) {
447 sink(SignalKind.End);
448 } else if (signal.tag === SignalKind.Start) {
449 talkback = signal[0];
450 sink(signal);
451 } else if (rest-- > 0) {
452 talkback(TalkbackKind.Pull);
453 } else {
454 sink(signal);
455 }
456 });
457 };
458}
459
460export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
461 return source => sink => {
462 let sourceTalkback = talkbackPlaceholder;
463 let notifierTalkback = talkbackPlaceholder;
464 let skip = true;
465 let pulled = false;
466 let ended = false;
467 source(signal => {
468 if (ended) {
469 /*noop*/
470 } else if (signal === SignalKind.End) {
471 ended = true;
472 if (skip) notifierTalkback(TalkbackKind.Close);
473 sink(SignalKind.End);
474 } else if (signal.tag === SignalKind.Start) {
475 sourceTalkback = signal[0];
476 notifier(signal => {
477 if (signal === SignalKind.End) {
478 if (skip) {
479 ended = true;
480 sourceTalkback(TalkbackKind.Close);
481 }
482 } else if (signal.tag === SignalKind.Start) {
483 (notifierTalkback = signal[0])(TalkbackKind.Pull);
484 } else {
485 skip = false;
486 notifierTalkback(TalkbackKind.Close);
487 }
488 });
489 } else if (!skip) {
490 pulled = false;
491 sink(signal);
492 } else if (!pulled) {
493 pulled = true;
494 sourceTalkback(TalkbackKind.Pull);
495 notifierTalkback(TalkbackKind.Pull);
496 } else {
497 pulled = false;
498 }
499 });
500 sink(
501 start(signal => {
502 if (signal === TalkbackKind.Close && !ended) {
503 ended = true;
504 sourceTalkback(TalkbackKind.Close);
505 if (skip) notifierTalkback(TalkbackKind.Close);
506 } else if (!ended && !pulled) {
507 pulled = true;
508 if (skip) notifierTalkback(TalkbackKind.Pull);
509 sourceTalkback(TalkbackKind.Pull);
510 }
511 })
512 );
513 };
514}
515
516export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
517 return source => sink => {
518 let talkback = talkbackPlaceholder;
519 let skip = true;
520 source(signal => {
521 if (signal === SignalKind.End) {
522 sink(SignalKind.End);
523 } else if (signal.tag === SignalKind.Start) {
524 talkback = signal[0];
525 sink(signal);
526 } else if (skip) {
527 if (predicate(signal[0])) {
528 talkback(TalkbackKind.Pull);
529 } else {
530 skip = false;
531 sink(signal);
532 }
533 } else {
534 sink(signal);
535 }
536 });
537 };
538}
539
540export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
541 return source => sink => {
542 let outerTalkback = talkbackPlaceholder;
543 let innerTalkback = talkbackPlaceholder;
544 let outerPulled = false;
545 let innerPulled = false;
546 let innerActive = false;
547 let ended = false;
548 function applyInnerSource(innerSource: Source<Out>): void {
549 innerActive = true;
550 innerSource(signal => {
551 if (!innerActive) {
552 /*noop*/
553 } else if (signal === SignalKind.End) {
554 innerActive = false;
555 if (ended) {
556 sink(SignalKind.End);
557 } else if (!outerPulled) {
558 outerPulled = true;
559 outerTalkback(TalkbackKind.Pull);
560 }
561 } else if (signal.tag === SignalKind.Start) {
562 innerPulled = false;
563 (innerTalkback = signal[0])(TalkbackKind.Pull);
564 } else {
565 sink(signal);
566 if (!innerPulled) {
567 innerTalkback(TalkbackKind.Pull);
568 } else {
569 innerPulled = false;
570 }
571 }
572 });
573 }
574 source(signal => {
575 if (ended) {
576 /*noop*/
577 } else if (signal === SignalKind.End) {
578 ended = true;
579 if (!innerActive) sink(SignalKind.End);
580 } else if (signal.tag === SignalKind.Start) {
581 outerTalkback = signal[0];
582 } else {
583 if (innerActive) {
584 innerTalkback(TalkbackKind.Close);
585 innerTalkback = talkbackPlaceholder;
586 }
587 if (!outerPulled) {
588 outerPulled = true;
589 outerTalkback(TalkbackKind.Pull);
590 } else {
591 outerPulled = false;
592 }
593 applyInnerSource(map(signal[0]));
594 }
595 });
596 sink(
597 start(signal => {
598 if (signal === TalkbackKind.Close) {
599 if (!ended) {
600 ended = true;
601 outerTalkback(TalkbackKind.Close);
602 }
603 if (innerActive) {
604 innerActive = false;
605 innerTalkback(TalkbackKind.Close);
606 }
607 } else {
608 if (!ended && !outerPulled) {
609 outerPulled = true;
610 outerTalkback(TalkbackKind.Pull);
611 }
612 if (innerActive && !innerPulled) {
613 innerPulled = true;
614 innerTalkback(TalkbackKind.Pull);
615 }
616 }
617 })
618 );
619 };
620}
621
622export function switchAll<T>(source: Source<Source<T>>): Source<T> {
623 return switchMap<Source<T>, T>(identity)(source);
624}
625
626export function take<T>(max: number): Operator<T, T> {
627 return source => sink => {
628 let talkback = talkbackPlaceholder;
629 let ended = false;
630 let taken = 0;
631 source(signal => {
632 if (ended) {
633 /*noop*/
634 } else if (signal === SignalKind.End) {
635 ended = true;
636 sink(SignalKind.End);
637 } else if (signal.tag === SignalKind.Start) {
638 if (max <= 0) {
639 ended = true;
640 sink(SignalKind.End);
641 signal[0](TalkbackKind.Close);
642 } else {
643 talkback = signal[0];
644 }
645 } else if (taken++ < max) {
646 sink(signal);
647 if (!ended && taken >= max) {
648 ended = true;
649 sink(SignalKind.End);
650 talkback(TalkbackKind.Close);
651 }
652 } else {
653 sink(signal);
654 }
655 });
656 sink(
657 start(signal => {
658 if (signal === TalkbackKind.Close && !ended) {
659 ended = true;
660 talkback(TalkbackKind.Close);
661 } else if (signal === TalkbackKind.Pull && !ended && taken < max) {
662 talkback(TalkbackKind.Pull);
663 }
664 })
665 );
666 };
667}
668
669export function takeLast<T>(max: number): Operator<T, T> {
670 return source => sink => {
671 const queue: T[] = [];
672 let talkback = talkbackPlaceholder;
673 source(signal => {
674 if (signal === SignalKind.End) {
675 fromArray(queue)(sink);
676 } else if (signal.tag === SignalKind.Start) {
677 if (max <= 0) {
678 signal[0](TalkbackKind.Close);
679 fromArray(queue)(sink);
680 } else {
681 (talkback = signal[0])(TalkbackKind.Pull);
682 }
683 } else {
684 if (queue.length >= max && max) queue.shift();
685 queue.push(signal[0]);
686 talkback(TalkbackKind.Pull);
687 }
688 });
689 };
690}
691
692export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
693 return source => sink => {
694 let sourceTalkback = talkbackPlaceholder;
695 let notifierTalkback = talkbackPlaceholder;
696 let ended = false;
697 source(signal => {
698 if (ended) {
699 /*noop*/
700 } else if (signal === SignalKind.End) {
701 ended = true;
702 notifierTalkback(TalkbackKind.Close);
703 sink(SignalKind.End);
704 } else if (signal.tag === SignalKind.Start) {
705 sourceTalkback = signal[0];
706 notifier(signal => {
707 if (signal === SignalKind.End) {
708 /*noop*/
709 } else if (signal.tag === SignalKind.Start) {
710 (notifierTalkback = signal[0])(TalkbackKind.Pull);
711 } else {
712 ended = true;
713 notifierTalkback(TalkbackKind.Close);
714 sourceTalkback(TalkbackKind.Close);
715 sink(SignalKind.End);
716 }
717 });
718 } else {
719 sink(signal);
720 }
721 });
722 sink(
723 start(signal => {
724 if (signal === TalkbackKind.Close && !ended) {
725 ended = true;
726 sourceTalkback(TalkbackKind.Close);
727 notifierTalkback(TalkbackKind.Close);
728 } else if (!ended) {
729 sourceTalkback(TalkbackKind.Pull);
730 }
731 })
732 );
733 };
734}
735
736export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
737 return source => sink => {
738 let talkback = talkbackPlaceholder;
739 let ended = false;
740 source(signal => {
741 if (ended) {
742 /*noop*/
743 } else if (signal === SignalKind.End) {
744 ended = true;
745 sink(SignalKind.End);
746 } else if (signal.tag === SignalKind.Start) {
747 talkback = signal[0];
748 sink(signal);
749 } else if (!predicate(signal[0])) {
750 ended = true;
751 sink(SignalKind.End);
752 talkback(TalkbackKind.Close);
753 } else {
754 sink(signal);
755 }
756 });
757 };
758}
759
760export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
761 return source => sink => {
762 let id: any | void;
763 let deferredEnded = false;
764 let ended = false;
765 source(signal => {
766 if (ended) {
767 /*noop*/
768 } else if (signal === SignalKind.End) {
769 ended = true;
770 if (id) {
771 deferredEnded = true;
772 } else {
773 sink(SignalKind.End);
774 }
775 } else if (signal.tag === SignalKind.Start) {
776 const talkback = signal[0];
777 sink(
778 start(signal => {
779 if (signal === TalkbackKind.Close && !ended) {
780 ended = true;
781 deferredEnded = false;
782 if (id) clearTimeout(id);
783 talkback(TalkbackKind.Close);
784 } else if (!ended) {
785 talkback(TalkbackKind.Pull);
786 }
787 })
788 );
789 } else {
790 if (id) clearTimeout(id);
791 id = setTimeout(() => {
792 id = undefined;
793 sink(signal);
794 if (deferredEnded) sink(SignalKind.End);
795 }, timing(signal[0]));
796 }
797 });
798 };
799}
800
801export function delay<T>(wait: number): Operator<T, T> {
802 return source => sink => {
803 let active = 0;
804 source(signal => {
805 if (signal !== SignalKind.End && signal.tag === SignalKind.Start) {
806 sink(signal);
807 } else {
808 active++;
809 setTimeout(() => {
810 if (active) {
811 active--;
812 sink(signal);
813 }
814 }, wait);
815 }
816 });
817 };
818}
819
820export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
821 return source => sink => {
822 let skip = false;
823 let id: any | void;
824 source(signal => {
825 if (signal === SignalKind.End) {
826 if (id) clearTimeout(id);
827 sink(SignalKind.End);
828 } else if (signal.tag === SignalKind.Start) {
829 const talkback = signal[0];
830 sink(
831 start(signal => {
832 if (signal === TalkbackKind.Close) {
833 if (id) clearTimeout(id);
834 talkback(TalkbackKind.Close);
835 } else {
836 talkback(TalkbackKind.Pull);
837 }
838 })
839 );
840 } else if (!skip) {
841 skip = true;
842 if (id) clearTimeout(id);
843 id = setTimeout(() => {
844 id = undefined;
845 skip = false;
846 }, timing(signal[0]));
847 sink(signal);
848 }
849 });
850 };
851}
852
853export { mergeAll as flatten, onPush as tap };