1import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3import { fromArray } from './sources';
4
5const identity = <T>(x: T): T => x;
6
7export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
8 return source => sink => {
9 let buffer: T[] = [];
10 let sourceTalkback = talkbackPlaceholder;
11 let notifierTalkback = talkbackPlaceholder;
12 let pulled = false;
13 let ended = false;
14 source(signal => {
15 if (ended) {
16 /*noop*/
17 } else if (signal === SignalKind.End) {
18 ended = true;
19 notifierTalkback(TalkbackKind.Close);
20 if (buffer.length) sink(push(buffer));
21 sink(SignalKind.End);
22 } else if (signal.tag === SignalKind.Start) {
23 sourceTalkback = signal[0];
24 notifier(signal => {
25 if (ended) {
26 /*noop*/
27 } else if (signal === SignalKind.End) {
28 ended = true;
29 sourceTalkback(TalkbackKind.Close);
30 if (buffer.length) sink(push(buffer));
31 sink(SignalKind.End);
32 } else if (signal.tag === SignalKind.Start) {
33 notifierTalkback = signal[0];
34 } else if (buffer.length) {
35 const signal = push(buffer);
36 buffer = [];
37 sink(signal);
38 }
39 });
40 } else {
41 buffer.push(signal[0]);
42 if (!pulled) {
43 pulled = true;
44 sourceTalkback(TalkbackKind.Pull);
45 notifierTalkback(TalkbackKind.Pull);
46 } else {
47 pulled = false;
48 }
49 }
50 });
51 sink(
52 start(signal => {
53 if (signal === TalkbackKind.Close && !ended) {
54 ended = true;
55 sourceTalkback(TalkbackKind.Close);
56 notifierTalkback(TalkbackKind.Close);
57 } else if (!ended && !pulled) {
58 pulled = true;
59 sourceTalkback(TalkbackKind.Pull);
60 notifierTalkback(TalkbackKind.Pull);
61 }
62 })
63 );
64 };
65}
66
67export function combine<A, B>(sourceA: Source<A>, sourceB: Source<B>): Source<[A, B]> {
68 return sink => {
69 let lastValA: A | void;
70 let lastValB: B | void;
71 let talkbackA = talkbackPlaceholder;
72 let talkbackB = talkbackPlaceholder;
73 let gotSignal = false;
74 let gotEnd = false;
75 let ended = false;
76 sourceA(signal => {
77 if (signal === SignalKind.End) {
78 if (!gotEnd) {
79 gotEnd = true;
80 } else {
81 ended = true;
82 sink(SignalKind.End);
83 }
84 } else if (signal.tag === SignalKind.Start) {
85 talkbackA = signal[0];
86 } else if (lastValB === undefined) {
87 lastValA = signal[0];
88 if (!gotSignal) {
89 talkbackB(TalkbackKind.Pull);
90 } else {
91 gotSignal = false;
92 }
93 } else if (!ended) {
94 lastValA = signal[0];
95 gotSignal = false;
96 sink(push([lastValA, lastValB] as [A, B]));
97 }
98 });
99 sourceB(signal => {
100 if (signal === SignalKind.End) {
101 if (!gotEnd) {
102 gotEnd = true;
103 } else {
104 ended = true;
105 sink(SignalKind.End);
106 }
107 } else if (signal.tag === SignalKind.Start) {
108 talkbackB = signal[0];
109 } else if (lastValA === undefined) {
110 lastValB = signal[0];
111 if (!gotSignal) {
112 talkbackA(TalkbackKind.Pull);
113 } else {
114 gotSignal = false;
115 }
116 } else if (!ended) {
117 lastValB = signal[0];
118 gotSignal = false;
119 sink(push([lastValA, lastValB] as [A, B]));
120 }
121 });
122 sink(
123 start(signal => {
124 if (ended) {
125 /*noop*/
126 } else if (signal === TalkbackKind.Close) {
127 ended = true;
128 talkbackA(TalkbackKind.Close);
129 talkbackB(TalkbackKind.Close);
130 } else if (!gotSignal) {
131 gotSignal = true;
132 talkbackA(TalkbackKind.Pull);
133 talkbackB(TalkbackKind.Pull);
134 }
135 })
136 );
137 };
138}
139
140export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
141 return source => sink => {
142 const inputQueue: In[] = [];
143 let outerTalkback = talkbackPlaceholder;
144 let innerTalkback = talkbackPlaceholder;
145 let outerPulled = false;
146 let innerPulled = false;
147 let innerActive = false;
148 let ended = false;
149 function applyInnerSource(innerSource: Source<Out>): void {
150 innerActive = true;
151 innerSource(signal => {
152 if (signal === SignalKind.End) {
153 if (innerActive) {
154 innerActive = false;
155 if (inputQueue.length) {
156 applyInnerSource(map(inputQueue.shift()!));
157 } else if (ended) {
158 sink(SignalKind.End);
159 } else if (!outerPulled) {
160 outerPulled = true;
161 outerTalkback(TalkbackKind.Pull);
162 }
163 }
164 } else if (signal.tag === SignalKind.Start) {
165 innerPulled = false;
166 (innerTalkback = signal[0])(TalkbackKind.Pull);
167 } else if (innerActive) {
168 sink(signal);
169 if (innerPulled) {
170 innerPulled = false;
171 } else {
172 innerTalkback(TalkbackKind.Pull);
173 }
174 }
175 });
176 }
177 source(signal => {
178 if (ended) {
179 /*noop*/
180 } else if (signal === SignalKind.End) {
181 ended = true;
182 if (!innerActive && !inputQueue.length) sink(SignalKind.End);
183 } else if (signal.tag === SignalKind.Start) {
184 outerTalkback = signal[0];
185 } else {
186 outerPulled = false;
187 if (innerActive) {
188 inputQueue.push(signal[0]);
189 } else {
190 applyInnerSource(map(signal[0]));
191 }
192 }
193 });
194 sink(
195 start(signal => {
196 if (signal === TalkbackKind.Close) {
197 if (!ended) {
198 ended = true;
199 outerTalkback(TalkbackKind.Close);
200 }
201 if (innerActive) {
202 innerActive = false;
203 innerTalkback(TalkbackKind.Close);
204 }
205 } else {
206 if (!ended && !outerPulled) {
207 outerPulled = true;
208 outerTalkback(TalkbackKind.Pull);
209 }
210 if (innerActive && !innerPulled) {
211 innerPulled = true;
212 innerTalkback(TalkbackKind.Pull);
213 }
214 }
215 })
216 );
217 };
218}
219
220export function concatAll<T>(source: Source<Source<T>>): Source<T> {
221 return concatMap<Source<T>, T>(identity)(source);
222}
223
224export function concat<T>(sources: Source<T>[]): Source<T> {
225 return concatAll(fromArray(sources));
226}
227
228export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
229 return source => sink => {
230 let talkback = talkbackPlaceholder;
231 source(signal => {
232 if (signal === SignalKind.End) {
233 sink(SignalKind.End);
234 } else if (signal.tag === SignalKind.Start) {
235 talkback = signal[0];
236 sink(signal);
237 } else if (!predicate(signal[0])) {
238 talkback(TalkbackKind.Pull);
239 } else {
240 sink(signal);
241 }
242 });
243 };
244}
245
246export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
247 return source => sink =>
248 source(signal => {
249 if (signal === SignalKind.End || signal.tag === SignalKind.Start) {
250 sink(signal);
251 } else {
252 sink(push(map(signal[0])));
253 }
254 });
255}
256
257export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
258 return source => sink => {
259 let innerTalkbacks: TalkbackFn[] = [];
260 let outerTalkback = talkbackPlaceholder;
261 let outerPulled = false;
262 let ended = false;
263 function applyInnerSource(innerSource: Source<Out>): void {
264 let talkback = talkbackPlaceholder;
265 innerSource(signal => {
266 if (signal === SignalKind.End) {
267 if (innerTalkbacks.length) {
268 const index = innerTalkbacks.indexOf(talkback);
269 if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1);
270 if (!innerTalkbacks.length) {
271 if (ended) {
272 sink(SignalKind.End);
273 } else if (!outerPulled) {
274 outerPulled = true;
275 outerTalkback(TalkbackKind.Pull);
276 }
277 }
278 }
279 } else if (signal.tag === SignalKind.Start) {
280 innerTalkbacks.push((talkback = signal[0]));
281 talkback(TalkbackKind.Pull);
282 } else if (innerTalkbacks.length) {
283 sink(signal);
284 talkback(TalkbackKind.Pull);
285 }
286 });
287 }
288 source(signal => {
289 if (ended) {
290 /*noop*/
291 } else if (signal === SignalKind.End) {
292 ended = true;
293 if (!innerTalkbacks.length) sink(SignalKind.End);
294 } else if (signal.tag === SignalKind.Start) {
295 outerTalkback = signal[0];
296 } else {
297 outerPulled = false;
298 applyInnerSource(map(signal[0]));
299 if (!outerPulled) {
300 outerPulled = true;
301 outerTalkback(TalkbackKind.Pull);
302 }
303 }
304 });
305 sink(
306 start(signal => {
307 if (signal === TalkbackKind.Close) {
308 if (!ended) {
309 ended = true;
310 outerTalkback(TalkbackKind.Close);
311 }
312 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
313 a[i](TalkbackKind.Close);
314 innerTalkbacks.length = 0;
315 } else {
316 if (!ended && !outerPulled) {
317 outerPulled = true;
318 outerTalkback(TalkbackKind.Pull);
319 } else {
320 outerPulled = false;
321 }
322 for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
323 a[i](TalkbackKind.Pull);
324 }
325 })
326 );
327 };
328}
329
330export function mergeAll<T>(source: Source<Source<T>>): Source<T> {
331 return mergeMap<Source<T>, T>(identity)(source);
332}
333
334export function merge<T>(sources: Source<T>[]): Source<T> {
335 return mergeAll(fromArray(sources));
336}
337
338export function onEnd<T>(callback: () => void): Operator<T, T> {
339 return source => sink => {
340 let ended = false;
341 source(signal => {
342 if (ended) {
343 /*noop*/
344 } else if (signal === SignalKind.End) {
345 ended = true;
346 sink(SignalKind.End);
347 callback();
348 } else if (signal.tag === SignalKind.Start) {
349 const talkback = signal[0];
350 sink(
351 start(signal => {
352 if (signal === TalkbackKind.Close) {
353 ended = true;
354 talkback(TalkbackKind.Close);
355 callback();
356 } else {
357 talkback(signal);
358 }
359 })
360 );
361 } else {
362 sink(signal);
363 }
364 });
365 };
366}
367
368export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
369 return source => sink => {
370 let ended = false;
371 source(signal => {
372 if (ended) {
373 /*noop*/
374 } else if (signal === SignalKind.End) {
375 ended = true;
376 sink(SignalKind.End);
377 } else if (signal.tag === SignalKind.Start) {
378 const talkback = signal[0];
379 sink(
380 start(signal => {
381 if (signal === TalkbackKind.Close) ended = true;
382 talkback(signal);
383 })
384 );
385 } else {
386 callback(signal[0]);
387 sink(signal);
388 }
389 });
390 };
391}
392
393export function onStart<T>(callback: () => void): Operator<T, T> {
394 return source => sink =>
395 source(signal => {
396 if (signal === SignalKind.End) {
397 sink(SignalKind.End);
398 } else if (signal.tag === SignalKind.Start) {
399 sink(signal);
400 callback();
401 } else {
402 sink(signal);
403 }
404 });
405}
406
407export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
408 return source => sink => {
409 let sourceTalkback = talkbackPlaceholder;
410 let notifierTalkback = talkbackPlaceholder;
411 let value: T | void;
412 let pulled = false;
413 let ended = false;
414 source(signal => {
415 if (ended) {
416 /*noop*/
417 } else if (signal === SignalKind.End) {
418 ended = true;
419 notifierTalkback(TalkbackKind.Close);
420 sink(SignalKind.End);
421 } else if (signal.tag === SignalKind.Start) {
422 sourceTalkback = signal[0];
423 } else {
424 value = signal[0];
425 if (!pulled) {
426 pulled = true;
427 notifierTalkback(TalkbackKind.Pull);
428 sourceTalkback(TalkbackKind.Pull);
429 } else {
430 pulled = false;
431 }
432 }
433 });
434 notifier(signal => {
435 if (ended) {
436 /*noop*/
437 } else if (signal === SignalKind.End) {
438 ended = true;
439 sourceTalkback(TalkbackKind.Close);
440 sink(SignalKind.End);
441 } else if (signal.tag === SignalKind.Start) {
442 notifierTalkback = signal[0];
443 } else if (value !== undefined) {
444 const signal = push(value);
445 value = undefined;
446 sink(signal);
447 }
448 });
449 sink(
450 start(signal => {
451 if (signal === TalkbackKind.Close && !ended) {
452 ended = true;
453 sourceTalkback(TalkbackKind.Close);
454 notifierTalkback(TalkbackKind.Close);
455 } else if (!ended && !pulled) {
456 pulled = true;
457 sourceTalkback(TalkbackKind.Pull);
458 notifierTalkback(TalkbackKind.Pull);
459 }
460 })
461 );
462 };
463}
464
465export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
466 return source => sink => {
467 let acc = seed;
468 source(signal => {
469 if (signal === SignalKind.End) {
470 sink(SignalKind.End);
471 } else if (signal.tag === SignalKind.Start) {
472 sink(signal);
473 } else {
474 sink(push((acc = reducer(acc, signal[0]))));
475 }
476 });
477 };
478}
479
480export function share<T>(source: Source<T>): Source<T> {
481 let sinks: Sink<T>[] = [];
482 let talkback = talkbackPlaceholder;
483 let gotSignal = false;
484 return sink => {
485 sinks.push(sink);
486 if (sinks.length === 1) {
487 source(signal => {
488 if (signal === SignalKind.End) {
489 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
490 sinks.length = 0;
491 } else if (signal.tag === SignalKind.Start) {
492 talkback = signal[0];
493 } else {
494 gotSignal = false;
495 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
496 }
497 });
498 }
499 sink(
500 start(signal => {
501 if (signal === TalkbackKind.Close) {
502 const index = sinks.indexOf(sink);
503 if (index > -1) (sinks = sinks.slice()).splice(index, 1);
504 if (!sinks.length) talkback(TalkbackKind.Close);
505 } else if (!gotSignal) {
506 gotSignal = true;
507 talkback(TalkbackKind.Pull);
508 }
509 })
510 );
511 };
512}
513
514export function skip<T>(wait: number): Operator<T, T> {
515 return source => sink => {
516 let talkback = talkbackPlaceholder;
517 let rest = wait;
518 source(signal => {
519 if (signal === SignalKind.End) {
520 sink(SignalKind.End);
521 } else if (signal.tag === SignalKind.Start) {
522 talkback = signal[0];
523 sink(signal);
524 } else if (rest-- > 0) {
525 talkback(TalkbackKind.Pull);
526 } else {
527 sink(signal);
528 }
529 });
530 };
531}
532
533export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
534 return source => sink => {
535 let sourceTalkback = talkbackPlaceholder;
536 let notifierTalkback = talkbackPlaceholder;
537 let skip = true;
538 let pulled = false;
539 let ended = false;
540 source(signal => {
541 if (ended) {
542 /*noop*/
543 } else if (signal === SignalKind.End) {
544 ended = true;
545 if (skip) notifierTalkback(TalkbackKind.Close);
546 sink(SignalKind.End);
547 } else if (signal.tag === SignalKind.Start) {
548 sourceTalkback = signal[0];
549 notifier(signal => {
550 if (signal === SignalKind.End) {
551 if (skip) {
552 ended = true;
553 sourceTalkback(TalkbackKind.Close);
554 }
555 } else if (signal.tag === SignalKind.Start) {
556 (notifierTalkback = signal[0])(TalkbackKind.Pull);
557 } else {
558 skip = false;
559 notifierTalkback(TalkbackKind.Close);
560 }
561 });
562 } else if (!skip) {
563 pulled = false;
564 sink(signal);
565 } else if (!pulled) {
566 pulled = true;
567 sourceTalkback(TalkbackKind.Pull);
568 notifierTalkback(TalkbackKind.Pull);
569 } else {
570 pulled = false;
571 }
572 });
573 sink(
574 start(signal => {
575 if (signal === TalkbackKind.Close && !ended) {
576 ended = true;
577 sourceTalkback(TalkbackKind.Close);
578 if (skip) notifierTalkback(TalkbackKind.Close);
579 } else if (!ended && !pulled) {
580 pulled = true;
581 if (skip) notifierTalkback(TalkbackKind.Pull);
582 sourceTalkback(TalkbackKind.Pull);
583 }
584 })
585 );
586 };
587}
588
589export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
590 return source => sink => {
591 let talkback = talkbackPlaceholder;
592 let skip = true;
593 source(signal => {
594 if (signal === SignalKind.End) {
595 sink(SignalKind.End);
596 } else if (signal.tag === SignalKind.Start) {
597 talkback = signal[0];
598 sink(signal);
599 } else if (skip) {
600 if (predicate(signal[0])) {
601 talkback(TalkbackKind.Pull);
602 } else {
603 skip = false;
604 sink(signal);
605 }
606 } else {
607 sink(signal);
608 }
609 });
610 };
611}
612
613export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
614 return source => sink => {
615 let outerTalkback = talkbackPlaceholder;
616 let innerTalkback = talkbackPlaceholder;
617 let outerPulled = false;
618 let innerPulled = false;
619 let innerActive = false;
620 let ended = false;
621 function applyInnerSource(innerSource: Source<Out>): void {
622 innerActive = true;
623 innerSource(signal => {
624 if (!innerActive) {
625 /*noop*/
626 } else if (signal === SignalKind.End) {
627 innerActive = false;
628 if (ended) {
629 sink(SignalKind.End);
630 } else if (!outerPulled) {
631 outerPulled = true;
632 outerTalkback(TalkbackKind.Pull);
633 }
634 } else if (signal.tag === SignalKind.Start) {
635 innerPulled = false;
636 (innerTalkback = signal[0])(TalkbackKind.Pull);
637 } else {
638 sink(signal);
639 if (!innerPulled) {
640 innerTalkback(TalkbackKind.Pull);
641 } else {
642 innerPulled = false;
643 }
644 }
645 });
646 }
647 source(signal => {
648 if (ended) {
649 /*noop*/
650 } else if (signal === SignalKind.End) {
651 ended = true;
652 if (!innerActive) sink(SignalKind.End);
653 } else if (signal.tag === SignalKind.Start) {
654 outerTalkback = signal[0];
655 } else {
656 if (innerActive) {
657 innerTalkback(TalkbackKind.Close);
658 innerTalkback = talkbackPlaceholder;
659 }
660 if (!outerPulled) {
661 outerPulled = true;
662 outerTalkback(TalkbackKind.Pull);
663 } else {
664 outerPulled = false;
665 }
666 applyInnerSource(map(signal[0]));
667 }
668 });
669 sink(
670 start(signal => {
671 if (signal === TalkbackKind.Close) {
672 if (!ended) {
673 ended = true;
674 outerTalkback(TalkbackKind.Close);
675 }
676 if (innerActive) {
677 innerActive = false;
678 innerTalkback(TalkbackKind.Close);
679 }
680 } else {
681 if (!ended && !outerPulled) {
682 outerPulled = true;
683 outerTalkback(TalkbackKind.Pull);
684 }
685 if (innerActive && !innerPulled) {
686 innerPulled = true;
687 innerTalkback(TalkbackKind.Pull);
688 }
689 }
690 })
691 );
692 };
693}
694
695export function switchAll<T>(source: Source<Source<T>>): Source<T> {
696 return switchMap<Source<T>, T>(identity)(source);
697}
698
699export function take<T>(max: number): Operator<T, T> {
700 return source => sink => {
701 let talkback = talkbackPlaceholder;
702 let ended = false;
703 let taken = 0;
704 source(signal => {
705 if (ended) {
706 /*noop*/
707 } else if (signal === SignalKind.End) {
708 ended = true;
709 sink(SignalKind.End);
710 } else if (signal.tag === SignalKind.Start) {
711 if (max <= 0) {
712 ended = true;
713 sink(SignalKind.End);
714 signal[0](TalkbackKind.Close);
715 } else {
716 talkback = signal[0];
717 }
718 } else if (taken++ < max) {
719 sink(signal);
720 if (!ended && taken >= max) {
721 ended = true;
722 sink(SignalKind.End);
723 talkback(TalkbackKind.Close);
724 }
725 } else {
726 sink(signal);
727 }
728 });
729 sink(
730 start(signal => {
731 if (signal === TalkbackKind.Close && !ended) {
732 ended = true;
733 talkback(TalkbackKind.Close);
734 } else if (signal === TalkbackKind.Pull && !ended && taken < max) {
735 talkback(TalkbackKind.Pull);
736 }
737 })
738 );
739 };
740}
741
742export function takeLast<T>(max: number): Operator<T, T> {
743 return source => sink => {
744 const queue: T[] = [];
745 let talkback = talkbackPlaceholder;
746 source(signal => {
747 if (signal === SignalKind.End) {
748 fromArray(queue)(sink);
749 } else if (signal.tag === SignalKind.Start) {
750 if (max <= 0) {
751 signal[0](TalkbackKind.Close);
752 fromArray(queue)(sink);
753 } else {
754 (talkback = signal[0])(TalkbackKind.Pull);
755 }
756 } else {
757 if (queue.length >= max && max) queue.shift();
758 queue.push(signal[0]);
759 talkback(TalkbackKind.Pull);
760 }
761 });
762 };
763}
764
765export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
766 return source => sink => {
767 let sourceTalkback = talkbackPlaceholder;
768 let notifierTalkback = talkbackPlaceholder;
769 let ended = false;
770 source(signal => {
771 if (ended) {
772 /*noop*/
773 } else if (signal === SignalKind.End) {
774 ended = true;
775 notifierTalkback(TalkbackKind.Close);
776 sink(SignalKind.End);
777 } else if (signal.tag === SignalKind.Start) {
778 sourceTalkback = signal[0];
779 notifier(signal => {
780 if (signal === SignalKind.End) {
781 /*noop*/
782 } else if (signal.tag === SignalKind.Start) {
783 (notifierTalkback = signal[0])(TalkbackKind.Pull);
784 } else {
785 ended = true;
786 sourceTalkback(TalkbackKind.Close);
787 sink(SignalKind.End);
788 }
789 });
790 } else {
791 sink(signal);
792 }
793 });
794 sink(
795 start(signal => {
796 if (signal === TalkbackKind.Close && !ended) {
797 ended = true;
798 sourceTalkback(TalkbackKind.Close);
799 notifierTalkback(TalkbackKind.Close);
800 } else if (!ended) {
801 sourceTalkback(TalkbackKind.Pull);
802 }
803 })
804 );
805 };
806}
807
808export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
809 return source => sink => {
810 let talkback = talkbackPlaceholder;
811 let ended = false;
812 source(signal => {
813 if (ended) {
814 /*noop*/
815 } else if (signal === SignalKind.End) {
816 ended = true;
817 sink(SignalKind.End);
818 } else if (signal.tag === SignalKind.Start) {
819 talkback = signal[0];
820 sink(signal);
821 } else if (!predicate(signal[0])) {
822 ended = true;
823 sink(SignalKind.End);
824 talkback(TalkbackKind.Close);
825 } else {
826 sink(signal);
827 }
828 });
829 };
830}
831
832export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
833 return source => sink => {
834 let id: any | void;
835 let deferredEnded = false;
836 let ended = false;
837 source(signal => {
838 if (ended) {
839 /*noop*/
840 } else if (signal === SignalKind.End) {
841 ended = true;
842 if (id) {
843 deferredEnded = true;
844 } else {
845 sink(SignalKind.End);
846 }
847 } else if (signal.tag === SignalKind.Start) {
848 const talkback = signal[0];
849 sink(
850 start(signal => {
851 if (signal === TalkbackKind.Close && !ended) {
852 ended = true;
853 deferredEnded = false;
854 if (id) clearTimeout(id);
855 talkback(TalkbackKind.Close);
856 } else if (!ended) {
857 talkback(TalkbackKind.Pull);
858 }
859 })
860 );
861 } else {
862 if (id) clearTimeout(id);
863 id = setTimeout(() => {
864 id = undefined;
865 sink(signal);
866 if (deferredEnded) sink(SignalKind.End);
867 }, timing(signal[0]));
868 }
869 });
870 };
871}
872
873export function delay<T>(wait: number): Operator<T, T> {
874 return source => sink => {
875 let active = 0;
876 source(signal => {
877 if (typeof signal !== 'number' && signal.tag === SignalKind.Start) {
878 sink(signal);
879 } else {
880 active++;
881 setTimeout(() => {
882 if (active) {
883 active--;
884 sink(signal);
885 }
886 }, wait);
887 }
888 });
889 };
890}
891
892export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
893 return source => sink => {
894 let skip = false;
895 let id: any | void;
896 source(signal => {
897 if (signal === SignalKind.End) {
898 if (id) clearTimeout(id);
899 sink(SignalKind.End);
900 } else if (signal.tag === SignalKind.Start) {
901 const talkback = signal[0];
902 sink(
903 start(signal => {
904 if (signal === TalkbackKind.Close) {
905 if (id) clearTimeout(id);
906 talkback(TalkbackKind.Close);
907 } else {
908 talkback(TalkbackKind.Pull);
909 }
910 })
911 );
912 } else if (!skip) {
913 skip = true;
914 if (id) clearTimeout(id);
915 id = setTimeout(() => {
916 id = undefined;
917 skip = false;
918 }, timing(signal[0]));
919 sink(signal);
920 }
921 });
922 };
923}
924
925export { mergeAll as flatten, onPush as tap };