1open Jest;
2open Wonka_types;
3
4let it = test;
5
6describe("source factories", () => {
7 describe("fromList", () => {
8 open Expect;
9 open! Expect.Operators;
10
11 it("sends list items to a puller sink", () => {
12 let source = Wonka.fromList([10, 20, 30]);
13 let talkback = ref((. _: Wonka_types.talkbackT) => ());
14 let signals = [||];
15
16 source((. signal) =>
17 switch (signal) {
18 | Start(x) => talkback := x
19 | Push(_) => ignore(Js.Array.push(signal, signals))
20 | End => ignore(Js.Array.push(signal, signals))
21 }
22 );
23
24 talkback^(. Pull);
25 talkback^(. Pull);
26 talkback^(. Pull);
27 talkback^(. Pull);
28
29 expect(signals) == [|Push(10), Push(20), Push(30), End|];
30 });
31 });
32
33 describe("fromArray", () => {
34 open Expect;
35 open! Expect.Operators;
36
37 it("sends array items to a puller sink", () => {
38 let source = Wonka.fromArray([|10, 20, 30|]);
39 let talkback = ref((. _: Wonka_types.talkbackT) => ());
40 let signals = ref([||]);
41
42 source((. signal) =>
43 switch (signal) {
44 | Start(x) =>
45 talkback := x;
46 x(. Pull);
47 | Push(_) =>
48 signals := Array.append(signals^, [|signal|]);
49 talkback^(. Pull);
50 | End => signals := Array.append(signals^, [|signal|])
51 }
52 );
53
54 expect(signals^) == [|Push(10), Push(20), Push(30), End|];
55 });
56
57 it("does not blow up the stack when iterating something huge", () => {
58 let arr = Array.make(100000, 123);
59 let source = Wonka.fromArray(arr);
60 let talkback = ref((. _: Wonka_types.talkbackT) => ());
61 let values = [||];
62
63 source((. signal) =>
64 switch (signal) {
65 | Start(x) =>
66 talkback := x;
67 x(. Pull);
68 | Push(x) =>
69 ignore(Js.Array.push(x, values));
70 talkback^(. Pull);
71 | End => ()
72 }
73 );
74
75 expect(Array.length(values)) == Array.length(arr);
76 });
77 });
78
79 describe("fromValue", () => {
80 open Expect;
81 open! Expect.Operators;
82
83 it("sends a single item to a puller sink", () => {
84 let source = Wonka.fromValue(123);
85 let talkback = ref((. _: Wonka_types.talkbackT) => ());
86 let signals = [||];
87
88 source((. signal) =>
89 switch (signal) {
90 | Start(x) => talkback := x
91 | Push(_) => ignore(Js.Array.push(signal, signals))
92 | End => ignore(Js.Array.push(signal, signals))
93 }
94 );
95
96 talkback^(. Pull);
97 talkback^(. Pull);
98 talkback^(. Pull); /* one extra to check whether no signal comes back after it has ended */
99
100 expect(signals) == [|Push(123), End|];
101 });
102 });
103
104 describe("empty", () => {
105 open Expect;
106 open! Expect.Operators;
107
108 it("ends immediately", () => {
109 let talkback = ref((. _: Wonka_types.talkbackT) => ());
110 let signals = [||];
111
112 Wonka.empty((. signal) =>
113 switch (signal) {
114 | Start(x) => talkback := x
115 | _ => ignore(Js.Array.push(signal, signals))
116 }
117 );
118
119 let _signals = Array.copy(signals);
120
121 talkback^(. Pull);
122 talkback^(. Pull);
123
124 expect((_signals, signals)) == ([|End|], [|End|]);
125 });
126 });
127
128 describe("never", () => {
129 open Expect;
130 open! Expect.Operators;
131
132 it("does not end", () => {
133 let talkback = ref((. _: Wonka_types.talkbackT) => ());
134 let ended = ref(false);
135
136 Wonka.never((. signal) =>
137 switch (signal) {
138 | Start(x) => talkback := x
139 | End => ended := true
140 | _ => ()
141 }
142 );
143
144 talkback^(. Pull);
145 talkback^(. Pull);
146
147 expect(ended^) === false;
148 });
149 });
150});
151
152describe("operator factories", () => {
153 describe("map", () => {
154 open Expect;
155
156 it("maps all emissions of a source", () => {
157 let num = ref(1);
158 let nums = [||];
159 let talkback = ref((. _: Wonka_types.talkbackT) => ());
160
161 Wonka.map(
162 (. _) => {
163 let res = num^;
164 num := num^ + 1;
165 res;
166 },
167 sink =>
168 sink(.
169 Start(
170 (. signal) =>
171 switch (signal) {
172 | Pull => sink(. Push(1))
173 | _ => ()
174 },
175 ),
176 ),
177 (. signal) =>
178 switch (signal) {
179 | Start(x) =>
180 talkback := x;
181 x(. Pull);
182 | Push(x) when num^ < 6 =>
183 ignore(Js.Array.push(x, nums));
184 talkback^(. Pull);
185 | _ => ()
186 },
187 );
188
189 expect(nums) |> toEqual([|1, 2, 3, 4|]);
190 });
191
192 testPromise("follows the spec for listenables", () =>
193 Wonka_thelpers.testWithListenable(Wonka.map((. x) => x))
194 |> Js.Promise.then_(x =>
195 expect(x)
196 |> toEqual(([||], [|Push(1), Push(2), End|]))
197 |> Js.Promise.resolve
198 )
199 );
200
201 testPromise(
202 "ends itself and source when its talkback receives the End signal", () => {
203 let end_: talkbackT = Close;
204
205 Wonka_thelpers.testTalkbackEnd(Wonka.map((. x) => x))
206 |> Js.Promise.then_(x =>
207 expect(x)
208 |> toEqual(([|end_|], [|Push(1)|]))
209 |> Js.Promise.resolve
210 );
211 });
212 });
213
214 describe("filter", () => {
215 open Expect;
216
217 it("filters emissions according to a predicate", () => {
218 let i = ref(1);
219 let nums = [||];
220 let talkback = ref((. _: Wonka_types.talkbackT) => ());
221
222 Wonka.filter(
223 (. x) => x mod 2 === 0,
224 sink =>
225 sink(.
226 Start(
227 (. signal) =>
228 switch (signal) {
229 | Pull =>
230 let num = i^;
231 i := i^ + 1;
232 sink(. Push(num));
233 | _ => ()
234 },
235 ),
236 ),
237 (. signal) =>
238 switch (signal) {
239 | Start(x) =>
240 talkback := x;
241 x(. Pull);
242 | Push(x) when x < 6 =>
243 ignore(Js.Array.push(x, nums));
244 talkback^(. Pull);
245 | _ => ()
246 },
247 );
248
249 expect(nums) |> toEqual([|2, 4|]);
250 });
251
252 testPromise("follows the spec for listenables", () =>
253 Wonka_thelpers.testWithListenable(Wonka.filter((. _) => true))
254 |> Js.Promise.then_(x =>
255 expect(x)
256 |> toEqual(([||], [|Push(1), Push(2), End|]))
257 |> Js.Promise.resolve
258 )
259 );
260
261 testPromise("follows the spec for listenables when filtering", () =>
262 Wonka_thelpers.testWithListenable(Wonka.filter((. _) => false))
263 |> Js.Promise.then_(x =>
264 expect(x)
265 |> toEqual(([|Pull, Pull|], [|End|]))
266 |> Js.Promise.resolve
267 )
268 );
269
270 testPromise(
271 "ends itself and source when its talkback receives the End signal", () => {
272 let end_: talkbackT = Close;
273
274 Wonka_thelpers.testTalkbackEnd(Wonka.filter((. _) => true))
275 |> Js.Promise.then_(x =>
276 expect(x)
277 |> toEqual(([|end_|], [|Push(1)|]))
278 |> Js.Promise.resolve
279 );
280 });
281 });
282
283 describe("scan", () => {
284 open Expect;
285
286 it("folds emissions using an initial seed value", () => {
287 let talkback = ref((. _: Wonka_types.talkbackT) => ());
288 let num = ref(1);
289
290 let source =
291 Wonka.scan(
292 (. acc, x) => acc + x,
293 0,
294 sink =>
295 sink(.
296 Start(
297 (. signal) =>
298 switch (signal) {
299 | Pull =>
300 let i = num^;
301 if (i <= 3) {
302 num := num^ + 1;
303 sink(. Push(i));
304 } else {
305 sink(. End);
306 };
307 | _ => ()
308 },
309 ),
310 ),
311 );
312
313 let res = [||];
314
315 source((. signal) =>
316 switch (signal) {
317 | Start(x) => talkback := x
318 | _ => ignore(Js.Array.push(signal, res))
319 }
320 );
321
322 talkback^(. Pull);
323 talkback^(. Pull);
324 talkback^(. Pull);
325 talkback^(. Pull);
326 expect(res) |> toEqual([|Push(1), Push(3), Push(6), End|]);
327 });
328
329 testPromise("follows the spec for listenables", () =>
330 Wonka_thelpers.testWithListenable(Wonka.scan((. _, x) => x, 0))
331 |> Js.Promise.then_(x =>
332 expect(x)
333 |> toEqual(([||], [|Push(1), Push(2), End|]))
334 |> Js.Promise.resolve
335 )
336 );
337
338 testPromise(
339 "ends itself and source when its talkback receives the End signal", () => {
340 let end_: talkbackT = Close;
341
342 Wonka_thelpers.testTalkbackEnd(Wonka.scan((. _, x) => x, 0))
343 |> Js.Promise.then_(x =>
344 expect(x)
345 |> toEqual(([|end_|], [|Push(1)|]))
346 |> Js.Promise.resolve
347 );
348 });
349 });
350
351 describe("merge", () => {
352 open Expect;
353 open! Expect.Operators;
354
355 it("merges different sources into a single one", () => {
356 let a = Wonka.fromList([1, 2, 3]);
357 let b = Wonka.fromList([4, 5, 6]);
358 let talkback = ref((. _: Wonka_types.talkbackT) => ());
359 let signals = [||];
360 let source = Wonka.merge([|a, b|]);
361
362 source((. signal) =>
363 switch (signal) {
364 | Start(x) =>
365 talkback := x;
366 x(. Pull);
367 | Push(_) =>
368 ignore(Js.Array.push(signal, signals));
369 talkback^(. Pull);
370 | End => ignore(Js.Array.push(signal, signals))
371 }
372 );
373
374 expect(signals)
375 == [|Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End|];
376 });
377
378 testPromise("follows the spec for listenables", () =>
379 Wonka_thelpers.testWithListenable(source => Wonka.merge([|source|]))
380 |> Js.Promise.then_(x =>
381 expect(x)
382 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
383 |> Js.Promise.resolve
384 )
385 );
386
387 testPromise(
388 "ends itself and source when its talkback receives the End signal", () =>
389 Wonka_thelpers.testTalkbackEnd(source => Wonka.merge([|source|]))
390 |> Js.Promise.then_(x =>
391 expect(x)
392 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
393 |> Js.Promise.resolve
394 )
395 );
396 });
397
398 describe("concat", () => {
399 open Expect;
400 open! Expect.Operators;
401
402 it("concatenates different sources into a single one", () => {
403 let a = Wonka.fromList([1, 2, 3]);
404 let b = Wonka.fromList([4, 5, 6]);
405 let talkback = ref((. _: Wonka_types.talkbackT) => ());
406 let signals = [||];
407 let source = Wonka.concat([|a, b|]);
408
409 source((. signal) =>
410 switch (signal) {
411 | Start(x) =>
412 talkback := x;
413 x(. Pull);
414 | Push(_) =>
415 ignore(Js.Array.push(signal, signals));
416 talkback^(. Pull);
417 | End => ignore(Js.Array.push(signal, signals))
418 }
419 );
420
421 expect(signals)
422 == [|Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End|];
423 });
424
425 testPromise("follows the spec for listenables", () =>
426 Wonka_thelpers.testWithListenable(source => Wonka.concat([|source|]))
427 |> Js.Promise.then_(x =>
428 expect(x)
429 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
430 |> Js.Promise.resolve
431 )
432 );
433
434 testPromise(
435 "ends itself and source when its talkback receives the End signal", () =>
436 Wonka_thelpers.testTalkbackEnd(source => Wonka.concat([|source|]))
437 |> Js.Promise.then_(x =>
438 expect(x)
439 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
440 |> Js.Promise.resolve
441 )
442 );
443 });
444
445 describe("share", () => {
446 open Expect;
447
448 it("shares an underlying source with all sinks", () => {
449 let talkback = ref((. _: Wonka_types.talkbackT) => ());
450 let aborterTb = ref((. _: Wonka_types.talkbackT) => ());
451 let num = ref(1);
452 let nums = [||];
453
454 let source =
455 Wonka.share(sink =>
456 sink(.
457 Start(
458 (. signal) =>
459 switch (signal) {
460 | Pull =>
461 let i = num^;
462 if (i <= 2) {
463 num := num^ + 1;
464 sink(. Push(i));
465 } else {
466 sink(. End);
467 };
468 | _ => ()
469 },
470 ),
471 )
472 );
473
474 source((. signal) =>
475 switch (signal) {
476 | Start(x) => talkback := x
477 | _ => ignore(Js.Array.push(signal, nums))
478 }
479 );
480
481 source((. signal) =>
482 switch (signal) {
483 | Start(_) => ()
484 | _ => ignore(Js.Array.push(signal, nums))
485 }
486 );
487
488 source((. signal) =>
489 switch (signal) {
490 | Start(tb) => aborterTb := tb
491 | _ =>
492 ignore(Js.Array.push(signal, nums));
493 aborterTb^(. Close);
494 }
495 );
496
497 talkback^(. Pull);
498 let numsA = Array.copy(nums);
499 talkback^(. Pull);
500 talkback^(. Pull);
501 talkback^(. Pull);
502 expect((numsA, nums))
503 |> toEqual((
504 [|Push(1), Push(1), Push(1)|],
505 [|Push(1), Push(1), Push(1), Push(2), Push(2), End, End|],
506 ));
507 });
508
509 testPromise("follows the spec for listenables", () =>
510 Wonka_thelpers.testWithListenable(Wonka.share)
511 |> Js.Promise.then_(x =>
512 expect(x)
513 |> toEqual(([||], [|Push(1), Push(2), End|]))
514 |> Js.Promise.resolve
515 )
516 );
517
518 testPromise(
519 "ends itself and source when its talkback receives the End signal", () => {
520 let end_: talkbackT = Close;
521
522 Wonka_thelpers.testTalkbackEnd(Wonka.share)
523 |> Js.Promise.then_(x =>
524 expect(x)
525 |> toEqual(([|end_|], [|Push(1)|]))
526 |> Js.Promise.resolve
527 );
528 });
529 });
530
531 describe("combine", () => {
532 open Expect;
533
534 it("combines the latest values of two sources", () => {
535 let talkback = ref((. _: Wonka_types.talkbackT) => ());
536
537 let makeSource = (factor: int) => {
538 let num = ref(1);
539
540 sink => {
541 sink(.
542 Start(
543 (. signal) =>
544 switch (signal) {
545 | Pull =>
546 if (num^ <= 2) {
547 let i = num^ * factor;
548 num := num^ + 1;
549 sink(. Push(i));
550 } else {
551 sink(. End);
552 }
553 | _ => ()
554 },
555 ),
556 );
557 };
558 };
559
560 let sourceA = makeSource(1);
561 let sourceB = makeSource(2);
562 let source = Wonka.combine(sourceA, sourceB);
563 let res = [||];
564
565 source((. signal) =>
566 switch (signal) {
567 | Start(x) => talkback := x
568 | _ => ignore(Js.Array.push(signal, res))
569 }
570 );
571
572 talkback^(. Pull);
573 talkback^(. Pull);
574 talkback^(. Pull);
575 talkback^(. Pull);
576 expect(res)
577 |> toEqual([|Push((1, 2)), Push((2, 2)), Push((2, 4)), End|]);
578 });
579
580 testPromise("follows the spec for listenables", () =>
581 Wonka_thelpers.testWithListenable(source => {
582 let shared = Wonka.share(source);
583 Wonka.combine(shared, shared);
584 })
585 |> Js.Promise.then_(x =>
586 expect(x)
587 |> toEqual((
588 [||],
589 [|Push((1, 1)), Push((2, 1)), Push((2, 2)), End|],
590 ))
591 |> Js.Promise.resolve
592 )
593 );
594
595 testPromise(
596 "ends itself and source when its talkback receives the End signal", () => {
597 let end_: talkbackT = Close;
598
599 Wonka_thelpers.testTalkbackEnd(source => {
600 let shared = Wonka.share(source);
601 Wonka.combine(shared, shared);
602 })
603 |> Js.Promise.then_(x =>
604 expect(x)
605 |> toEqual(([|end_|], [|Push((1, 1))|]))
606 |> Js.Promise.resolve
607 );
608 });
609 });
610
611 describe("take", () => {
612 open Expect;
613
614 it("only lets a maximum number of values through", () => {
615 let talkback = ref((. _: Wonka_types.talkbackT) => ());
616 let num = ref(1);
617
618 let source =
619 Wonka.take(2, sink =>
620 sink(.
621 Start(
622 (. signal) =>
623 switch (signal) {
624 | Pull =>
625 let i = num^;
626 num := num^ + 1;
627 sink(. Push(i));
628 | _ => ()
629 },
630 ),
631 )
632 );
633
634 let res = [||];
635
636 source((. signal) =>
637 switch (signal) {
638 | Start(x) => talkback := x
639 | _ => ignore(Js.Array.push(signal, res))
640 }
641 );
642
643 talkback^(. Pull);
644 talkback^(. Pull);
645 talkback^(. Pull);
646 talkback^(. Pull);
647 expect(res) |> toEqual([|Push(1), Push(2), End|]);
648 });
649
650 it(
651 "accepts the end of the source when max number of emissions is not reached",
652 () => {
653 let talkback = ref((. _: Wonka_types.talkbackT) => ());
654 let num = ref(1);
655
656 let source =
657 Wonka.take(2, sink =>
658 sink(.
659 Start(
660 (. signal) =>
661 switch (signal) {
662 | Pull =>
663 let i = num^;
664 if (i < 2) {
665 num := num^ + 1;
666 sink(. Push(i));
667 } else {
668 sink(. End);
669 };
670 | _ => ()
671 },
672 ),
673 )
674 );
675
676 let res = [||];
677
678 source((. signal) =>
679 switch (signal) {
680 | Start(x) => talkback := x
681 | _ => ignore(Js.Array.push(signal, res))
682 }
683 );
684
685 talkback^(. Pull);
686 talkback^(. Pull);
687 talkback^(. Pull);
688 expect(res) |> toEqual([|Push(1), End|]);
689 });
690
691 testPromise("follows the spec for listenables", () =>
692 Wonka_thelpers.testWithListenable(Wonka.take(10))
693 |> Js.Promise.then_(x =>
694 expect(x)
695 |> toEqual(([||], [|Push(1), Push(2), End|]))
696 |> Js.Promise.resolve
697 )
698 );
699
700 testPromise("follows the spec for listenables when ending the source", () => {
701 let end_: talkbackT = Close;
702
703 Wonka_thelpers.testWithListenable(Wonka.take(1))
704 |> Js.Promise.then_(x =>
705 expect(x)
706 |> toEqual(([|end_|], [|Push(1), End|]))
707 |> Js.Promise.resolve
708 );
709 });
710
711 testPromise(
712 "ends itself and source when its talkback receives the End signal", () => {
713 let end_: talkbackT = Close;
714
715 Wonka_thelpers.testTalkbackEnd(Wonka.take(10))
716 |> Js.Promise.then_(x =>
717 expect(x)
718 |> toEqual(([|end_|], [|Push(1)|]))
719 |> Js.Promise.resolve
720 );
721 });
722 });
723
724 describe("takeLast", () => {
725 open Expect;
726
727 it("only lets the last n values through on an entirely new source", () => {
728 let talkback = ref((. _: Wonka_types.talkbackT) => ());
729 let num = ref(1);
730
731 let source =
732 Wonka.takeLast(2, sink =>
733 sink(.
734 Start(
735 (. signal) =>
736 switch (signal) {
737 | Pull when num^ <= 4 =>
738 let i = num^;
739 num := num^ + 1;
740 sink(. Push(i));
741 | Pull => sink(. End)
742 | _ => ()
743 },
744 ),
745 )
746 );
747
748 let res = [||];
749
750 source((. signal) =>
751 switch (signal) {
752 | Start(x) => talkback := x
753 | _ => ignore(Js.Array.push(signal, res))
754 }
755 );
756
757 talkback^(. Pull);
758 talkback^(. Pull);
759 talkback^(. Pull);
760 expect(res) |> toEqual([|Push(3), Push(4), End|]);
761 });
762
763 testPromise("follows the spec for listenables", () =>
764 Wonka_thelpers.testWithListenable(Wonka.takeLast(10))
765 |> Js.Promise.then_(x =>
766 expect(x)
767 |> toEqual((
768 [|Pull, Pull, Pull|],
769 [|/* empty since the source is a pullable */|],
770 ))
771 |> Js.Promise.resolve
772 )
773 );
774
775 testPromise(
776 "ends itself and source when its talkback receives the End signal", () =>
777 Wonka_thelpers.testTalkbackEnd(Wonka.takeLast(10))
778 |> Js.Promise.then_(x =>
779 expect(x)
780 |> toEqual(([|Pull, Pull|], [||]))
781 |> Js.Promise.resolve
782 )
783 );
784 });
785
786 describe("takeWhile", () => {
787 open Expect;
788
789 it("only lets the last n values through on an entirely new source", () => {
790 let talkback = ref((. _: Wonka_types.talkbackT) => ());
791 let num = ref(1);
792
793 let source =
794 Wonka.takeWhile(
795 (. x) => x <= 2,
796 sink =>
797 sink(.
798 Start(
799 (. signal) =>
800 switch (signal) {
801 | Pull =>
802 let i = num^;
803 num := num^ + 1;
804 sink(. Push(i));
805 | _ => ()
806 },
807 ),
808 ),
809 );
810
811 let res = [||];
812
813 source((. signal) =>
814 switch (signal) {
815 | Start(x) => talkback := x
816 | _ => ignore(Js.Array.push(signal, res))
817 }
818 );
819
820 talkback^(. Pull);
821 talkback^(. Pull);
822 talkback^(. Pull);
823 talkback^(. Pull);
824
825 expect(res) |> toEqual([|Push(1), Push(2), End|]);
826 });
827
828 it(
829 "accepts the end of the source when max number of emissions is not reached",
830 () => {
831 let talkback = ref((. _: Wonka_types.talkbackT) => ());
832 let num = ref(1);
833
834 let source =
835 Wonka.takeWhile(
836 (. x) => x <= 5,
837 sink =>
838 sink(.
839 Start(
840 (. signal) =>
841 switch (signal) {
842 | Pull =>
843 let i = num^;
844 if (i < 2) {
845 num := num^ + 1;
846 sink(. Push(i));
847 } else {
848 sink(. End);
849 };
850 | _ => ()
851 },
852 ),
853 ),
854 );
855
856 let res = [||];
857
858 source((. signal) =>
859 switch (signal) {
860 | Start(x) => talkback := x
861 | _ => ignore(Js.Array.push(signal, res))
862 }
863 );
864
865 talkback^(. Pull);
866 talkback^(. Pull);
867 talkback^(. Pull);
868
869 expect(res) |> toEqual([|Push(1), End|]);
870 });
871
872 testPromise("follows the spec for listenables", () =>
873 Wonka_thelpers.testWithListenable(Wonka.takeWhile((. _) => true))
874 |> Js.Promise.then_(x =>
875 expect(x)
876 |> toEqual(([||], [|Push(1), Push(2), End|]))
877 |> Js.Promise.resolve
878 )
879 );
880
881 testPromise("follows the spec for listenables when ending the source", () => {
882 let end_: talkbackT = Close;
883
884 Wonka_thelpers.testWithListenable(Wonka.takeWhile((. _) => false))
885 |> Js.Promise.then_(x =>
886 expect(x) |> toEqual(([|end_|], [|End|])) |> Js.Promise.resolve
887 );
888 });
889
890 testPromise(
891 "ends itself and source when its talkback receives the End signal", () => {
892 let end_: talkbackT = Close;
893
894 Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((. _) => true))
895 |> Js.Promise.then_(x =>
896 expect(x)
897 |> toEqual(([|end_|], [|Push(1)|]))
898 |> Js.Promise.resolve
899 );
900 });
901 });
902
903 describe("takeUntil", () => {
904 open Expect;
905
906 it("only lets the last n values through on an entirely new source", () => {
907 let talkback = ref((. _: Wonka_types.talkbackT) => ());
908 let notify = ref((_: Wonka_types.talkbackT) => ());
909 let num = ref(1);
910
911 let notifier = sink => {
912 notify :=
913 (
914 signal =>
915 switch (signal) {
916 | Pull => sink(. Push(0))
917 | _ => ()
918 }
919 );
920
921 sink(. Start(Wonka_helpers.talkbackPlaceholder));
922 };
923
924 let source =
925 Wonka.takeUntil(notifier, sink =>
926 sink(.
927 Start(
928 (. signal) =>
929 switch (signal) {
930 | Pull when num^ <= 4 =>
931 let i = num^;
932 if (i === 3) {
933 notify^(Pull);
934 };
935 num := num^ + 1;
936 sink(. Push(i));
937 | _ => ()
938 },
939 ),
940 )
941 );
942
943 let res = [||];
944
945 source((. signal) =>
946 switch (signal) {
947 | Start(x) => talkback := x
948 | _ => ignore(Js.Array.push(signal, res))
949 }
950 );
951
952 talkback^(. Pull);
953 talkback^(. Pull);
954 talkback^(. Pull);
955 talkback^(. Pull);
956
957 expect(res) |> toEqual([|Push(1), Push(2), End|]);
958 });
959
960 it(
961 "accepts the end of the source when max number of emissions is not reached",
962 () => {
963 let talkback = ref((. _: Wonka_types.talkbackT) => ());
964 let num = ref(1);
965 let notifier = sink =>
966 sink(. Start(Wonka_helpers.talkbackPlaceholder));
967
968 let source =
969 Wonka.takeUntil(notifier, sink =>
970 sink(.
971 Start(
972 (. signal) =>
973 switch (signal) {
974 | Pull =>
975 let i = num^;
976 if (num^ <= 2) {
977 num := num^ + 1;
978 sink(. Push(i));
979 } else {
980 sink(. End);
981 };
982 | _ => ()
983 },
984 ),
985 )
986 );
987
988 let res = [||];
989
990 source((. signal) =>
991 switch (signal) {
992 | Start(x) => talkback := x
993 | _ => ignore(Js.Array.push(signal, res))
994 }
995 );
996
997 talkback^(. Pull);
998 talkback^(. Pull);
999 talkback^(. Pull);
1000 talkback^(. Pull);
1001
1002 expect(res) |> toEqual([|Push(1), Push(2), End|]);
1003 });
1004
1005 testPromise("follows the spec for listenables", () =>
1006 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.never))
1007 |> Js.Promise.then_(x =>
1008 expect(x)
1009 |> toEqual(([||], [|Push(1), Push(2), End|]))
1010 |> Js.Promise.resolve
1011 )
1012 );
1013
1014 testPromise("follows the spec for listenables when ending the source", () => {
1015 let end_: talkbackT = Close;
1016
1017 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.fromValue(0)))
1018 |> Js.Promise.then_(x =>
1019 expect(x) |> toEqual(([|end_|], [|End|])) |> Js.Promise.resolve
1020 );
1021 });
1022
1023 testPromise(
1024 "ends itself and source when its talkback receives the End signal", () => {
1025 let end_: talkbackT = Close;
1026
1027 Wonka_thelpers.testTalkbackEnd(Wonka.takeUntil(Wonka.never))
1028 |> Js.Promise.then_(x =>
1029 expect(x)
1030 |> toEqual(([|end_|], [|Push(1)|]))
1031 |> Js.Promise.resolve
1032 );
1033 });
1034 });
1035
1036 describe("skip", () => {
1037 open Expect;
1038
1039 it(
1040 "only lets values through after a number of values have been filtered out",
1041 () => {
1042 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1043 let num = ref(1);
1044
1045 let source =
1046 Wonka.skip(2, sink =>
1047 sink(.
1048 Start(
1049 (. signal) =>
1050 switch (signal) {
1051 | Pull when num^ <= 4 =>
1052 let i = num^;
1053 num := num^ + 1;
1054 sink(. Push(i));
1055 | Pull => sink(. End)
1056 | _ => ()
1057 },
1058 ),
1059 )
1060 );
1061
1062 let res = [||];
1063
1064 source((. signal) =>
1065 switch (signal) {
1066 | Start(x) => talkback := x
1067 | _ => ignore(Js.Array.push(signal, res))
1068 }
1069 );
1070
1071 talkback^(. Pull);
1072 talkback^(. Pull);
1073 talkback^(. Pull);
1074 expect(res) |> toEqual([|Push(3), Push(4), End|]);
1075 });
1076
1077 testPromise("follows the spec for listenables", () =>
1078 Wonka_thelpers.testWithListenable(Wonka.skip(0))
1079 |> Js.Promise.then_(x =>
1080 expect(x)
1081 |> toEqual(([||], [|Push(1), Push(2), End|]))
1082 |> Js.Promise.resolve
1083 )
1084 );
1085
1086 testPromise(
1087 "follows the spec for listenables when skipping the source", () =>
1088 Wonka_thelpers.testWithListenable(Wonka.skip(10))
1089 |> Js.Promise.then_(x =>
1090 expect(x)
1091 |> toEqual(([|Pull, Pull|], [|End|]))
1092 |> Js.Promise.resolve
1093 )
1094 );
1095
1096 testPromise(
1097 "ends itself and source when its talkback receives the End signal", () => {
1098 let end_: talkbackT = Close;
1099
1100 Wonka_thelpers.testTalkbackEnd(Wonka.skip(10))
1101 |> Js.Promise.then_(x =>
1102 expect(x)
1103 |> toEqual(([|Pull, end_|], [||]))
1104 |> Js.Promise.resolve
1105 );
1106 });
1107 });
1108
1109 describe("skipWhile", () => {
1110 open Expect;
1111
1112 it(
1113 "only lets values through after the predicate returned false, including the first such value",
1114 () => {
1115 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1116 let num = ref(1);
1117
1118 let source =
1119 Wonka.skipWhile(
1120 (. x) => x <= 2,
1121 sink =>
1122 sink(.
1123 Start(
1124 (. signal) =>
1125 switch (signal) {
1126 | Pull when num^ <= 4 =>
1127 let i = num^;
1128 num := num^ + 1;
1129 sink(. Push(i));
1130 | Pull => sink(. End)
1131 | _ => ()
1132 },
1133 ),
1134 ),
1135 );
1136
1137 let res = [||];
1138
1139 source((. signal) =>
1140 switch (signal) {
1141 | Start(x) => talkback := x
1142 | _ => ignore(Js.Array.push(signal, res))
1143 }
1144 );
1145
1146 talkback^(. Pull);
1147 talkback^(. Pull);
1148 talkback^(. Pull);
1149 expect(res) |> toEqual([|Push(3), Push(4), End|]);
1150 },
1151 );
1152
1153 testPromise("follows the spec for listenables", () =>
1154 Wonka_thelpers.testWithListenable(Wonka.skipWhile((. _) => false))
1155 |> Js.Promise.then_(x =>
1156 expect(x)
1157 |> toEqual(([||], [|Push(1), Push(2), End|]))
1158 |> Js.Promise.resolve
1159 )
1160 );
1161
1162 testPromise(
1163 "follows the spec for listenables when skipping the source", () =>
1164 Wonka_thelpers.testWithListenable(Wonka.skipWhile((. _) => true))
1165 |> Js.Promise.then_(x =>
1166 expect(x)
1167 |> toEqual(([|Pull, Pull|], [|End|]))
1168 |> Js.Promise.resolve
1169 )
1170 );
1171
1172 testPromise(
1173 "ends itself and source when its talkback receives the End signal", () => {
1174 let end_: talkbackT = Close;
1175
1176 Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((. _) => false))
1177 |> Js.Promise.then_(x =>
1178 expect(x)
1179 |> toEqual(([|end_|], [|Push(1)|]))
1180 |> Js.Promise.resolve
1181 );
1182 });
1183 });
1184
1185 describe("skipUntil", () => {
1186 open Expect;
1187
1188 it("only lets values through after the notifier emits a value", () => {
1189 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1190 let notify = ref((_: Wonka_types.talkbackT) => ());
1191 let num = ref(1);
1192
1193 let notifier = sink => {
1194 notify :=
1195 (
1196 signal =>
1197 switch (signal) {
1198 | Pull => sink(. Push(0))
1199 | _ => ()
1200 }
1201 );
1202
1203 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1204 };
1205
1206 let source =
1207 Wonka.skipUntil(notifier, sink =>
1208 sink(.
1209 Start(
1210 (. signal) =>
1211 switch (signal) {
1212 | Pull when num^ <= 4 =>
1213 let i = num^;
1214 if (i === 3) {
1215 notify^(Pull);
1216 };
1217 num := num^ + 1;
1218 sink(. Push(i));
1219 | Pull => sink(. End)
1220 | _ => ()
1221 },
1222 ),
1223 )
1224 );
1225
1226 let res = [||];
1227
1228 source((. signal) =>
1229 switch (signal) {
1230 | Start(x) => talkback := x
1231 | _ => ignore(Js.Array.push(signal, res))
1232 }
1233 );
1234
1235 talkback^(. Pull);
1236 talkback^(. Pull);
1237 talkback^(. Pull);
1238 talkback^(. Pull);
1239
1240 expect(res) |> toEqual([|Push(3), Push(4), End|]);
1241 });
1242
1243 it(
1244 "accepts the end of the source when max number of emissions is not reached",
1245 () => {
1246 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1247 let num = ref(1);
1248 let notifier = sink =>
1249 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1250
1251 let source =
1252 Wonka.skipUntil(notifier, sink =>
1253 sink(.
1254 Start(
1255 (. signal) =>
1256 switch (signal) {
1257 | Pull =>
1258 let i = num^;
1259 if (i < 2) {
1260 num := num^ + 1;
1261 sink(. Push(i));
1262 } else {
1263 sink(. End);
1264 };
1265 | _ => ()
1266 },
1267 ),
1268 )
1269 );
1270
1271 let res = [||];
1272
1273 source((. signal) =>
1274 switch (signal) {
1275 | Start(x) => talkback := x
1276 | _ => ignore(Js.Array.push(signal, res))
1277 }
1278 );
1279
1280 talkback^(. Pull);
1281 talkback^(. Pull);
1282 talkback^(. Pull);
1283
1284 expect(res) |> toEqual([|End|]);
1285 });
1286
1287 testPromise("follows the spec for listenables", () =>
1288 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.never))
1289 |> Js.Promise.then_(x =>
1290 expect(x)
1291 |> toEqual(([|Pull, Pull, Pull|], [|End|]))
1292 |> Js.Promise.resolve
1293 )
1294 );
1295
1296 testPromise(
1297 "follows the spec for listenables when skipping the source", () =>
1298 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.fromValue(0)))
1299 |> Js.Promise.then_(x =>
1300 expect(x)
1301 |> toEqual(([|Pull|], [|Push(1), Push(2), End|]))
1302 |> Js.Promise.resolve
1303 )
1304 );
1305
1306 testPromise(
1307 "ends itself and source when its talkback receives the End signal", () => {
1308 let end_: talkbackT = Close;
1309
1310 Wonka_thelpers.testTalkbackEnd(Wonka.skipUntil(Wonka.fromValue(0)))
1311 |> Js.Promise.then_(x =>
1312 expect(x)
1313 |> toEqual(([|Pull, end_|], [|Push(1)|]))
1314 |> Js.Promise.resolve
1315 );
1316 });
1317 });
1318
1319 describe("flatten", () =>
1320 Expect.(
1321 it("merges the result of multiple pullables into its source", () => {
1322 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1323 let source =
1324 Wonka.fromList([Wonka.fromList([1, 2]), Wonka.fromList([1, 2])])
1325 |> Wonka.flatten;
1326
1327 let res = [||];
1328
1329 source((. signal) =>
1330 switch (signal) {
1331 | Start(x) => talkback := x
1332 | _ => ignore(Js.Array.push(signal, res))
1333 }
1334 );
1335
1336 talkback^(. Pull);
1337 talkback^(. Pull);
1338 talkback^(. Pull);
1339 talkback^(. Pull);
1340 talkback^(. Pull);
1341 expect(res)
1342 |> toEqual([|Push(1), Push(2), Push(1), Push(2), End|]);
1343 })
1344 )
1345 );
1346
1347 describe("switchMap", () => {
1348 afterEach(() => Jest.useRealTimers());
1349 open Expect;
1350 open! Expect.Operators;
1351
1352 it("maps from a source and switches to a new source", () => {
1353 let a = Wonka.fromList([1, 2, 3]);
1354
1355 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1356 let signals = [||];
1357 let source = Wonka.switchMap((. x) => Wonka.fromList([x * x]), a);
1358
1359 source((. signal) =>
1360 switch (signal) {
1361 | Start(x) =>
1362 talkback := x;
1363 x(. Pull);
1364 | Push(_) =>
1365 ignore(Js.Array.push(signal, signals));
1366 talkback^(. Pull);
1367 | End => ignore(Js.Array.push(signal, signals))
1368 }
1369 );
1370
1371 expect(signals) == [|Push(1), Push(4), Push(9), End|];
1372 });
1373
1374 it("unsubscribes from previous subscriptions", () => {
1375 Jest.useFakeTimers();
1376
1377 let a = Wonka.interval(100);
1378
1379 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1380 let signals = [||];
1381 let source =
1382 Wonka.switchMap((. _) => Wonka.interval(25), a) |> Wonka.take(5);
1383
1384 source((. signal) =>
1385 switch (signal) {
1386 | Start(x) =>
1387 talkback := x;
1388 x(. Pull);
1389 | Push(_) =>
1390 ignore(Js.Array.push(signal, signals));
1391 talkback^(. Pull);
1392 | End => ignore(Js.Array.push(signal, signals))
1393 }
1394 );
1395
1396 Jest.runTimersToTime(300);
1397
1398 expect(signals)
1399 == [|Push(0), Push(1), Push(2), Push(0), Push(1), End|];
1400 });
1401
1402 testPromise("follows the spec for listenables", () =>
1403 Wonka_thelpers.testWithListenable(source =>
1404 Wonka.switchMap((. x) => x, Wonka.fromList([source]))
1405 )
1406 |> Js.Promise.then_(x =>
1407 expect(x)
1408 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
1409 |> Js.Promise.resolve
1410 )
1411 );
1412
1413 testPromise(
1414 "ends itself and source when its talkback receives the End signal", () =>
1415 Wonka_thelpers.testTalkbackEnd(source =>
1416 Wonka.switchMap((. x) => x, Wonka.fromList([source]))
1417 )
1418 |> Js.Promise.then_(x =>
1419 expect(x)
1420 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
1421 |> Js.Promise.resolve
1422 )
1423 );
1424 });
1425});
1426
1427describe("sink factories", () => {
1428 describe("forEach", () =>
1429 Expect.(
1430 it("calls a function for each emission of the passed source", () => {
1431 let i = ref(0);
1432 let nums = [||];
1433
1434 let source = sink => {
1435 sink(.
1436 Start(
1437 (. signal) =>
1438 switch (signal) {
1439 | Pull when i^ < 4 =>
1440 let num = i^;
1441 i := i^ + 1;
1442 sink(. Push(num));
1443 | Pull => sink(. End)
1444 | _ => ()
1445 },
1446 ),
1447 );
1448 };
1449
1450 Wonka.forEach((. x) => ignore(Js.Array.push(x, nums)), source);
1451 expect(nums) |> toEqual([|0, 1, 2, 3|]);
1452 })
1453 )
1454 );
1455
1456 describe("subscribe", () =>
1457 Expect.(
1458 it(
1459 "calls a function for each emission of the passed source and stops when unsubscribed",
1460 () => {
1461 let i = ref(0);
1462 let nums = [||];
1463 let push = ref(() => ());
1464
1465 let source = sink => {
1466 push :=
1467 (
1468 () => {
1469 let num = i^;
1470 i := i^ + 1;
1471 sink(. Push(num));
1472 }
1473 );
1474
1475 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1476 };
1477
1478 let {unsubscribe} =
1479 Wonka.subscribe(
1480 (. x) => ignore(Js.Array.push(x, nums)),
1481 source,
1482 );
1483
1484 push^();
1485 push^();
1486 unsubscribe();
1487 push^();
1488 push^();
1489
1490 expect(nums) |> toEqual([|0, 1|]);
1491 },
1492 )
1493 )
1494 );
1495});
1496
1497describe("chains (integration)", () =>
1498 Expect.(
1499 it("fromArray, map, forEach", () => {
1500 let input = Array.mapi((i, _) => i, Array.make(1000, 1));
1501 let output = Array.map(x => string_of_int(x));
1502 let actual = [||];
1503
1504 input
1505 |> Wonka.fromArray
1506 |> Wonka.map((. x) => string_of_int(x))
1507 |> Wonka.forEach((. x) => ignore(Js.Array.push(x, actual)));
1508
1509 expect(output) |> toEqual(output);
1510 })
1511 )
1512);
1513
1514describe("subject", () => {
1515 open Expect;
1516 open! Expect.Operators;
1517
1518 it("sends values passed to .next to puller sinks", () => {
1519 let signals = [||];
1520
1521 let subject = Wonka.makeSubject();
1522
1523 subject.source((. signal) =>
1524 switch (signal) {
1525 | Start(_) => ignore()
1526 | Push(_) => ignore(Js.Array.push(signal, signals))
1527 | End => ignore(Js.Array.push(signal, signals))
1528 }
1529 );
1530
1531 subject.next(10);
1532 subject.next(20);
1533 subject.next(30);
1534 subject.next(40);
1535 subject.complete();
1536
1537 expect(signals) == [|Push(10), Push(20), Push(30), Push(40), End|];
1538 });
1539
1540 it("handles multiple sinks", () => {
1541 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1542 let signalsOne = [||];
1543 let signalsTwo = [||];
1544
1545 let subject = Wonka.makeSubject();
1546
1547 subject.source((. signal) =>
1548 switch (signal) {
1549 | Start(x) => talkback := x
1550 | Push(_) => ignore(Js.Array.push(signal, signalsOne))
1551 | End => ignore(Js.Array.push(signal, signalsOne))
1552 }
1553 );
1554
1555 subject.source((. signal) =>
1556 switch (signal) {
1557 | Start(_) => ignore()
1558 | Push(_) => ignore(Js.Array.push(signal, signalsTwo))
1559 | End => ignore(Js.Array.push(signal, signalsTwo))
1560 }
1561 );
1562
1563 subject.next(10);
1564 subject.next(20);
1565 subject.next(30);
1566
1567 talkback^(. Close);
1568
1569 subject.next(40);
1570 subject.next(50);
1571
1572 subject.complete();
1573
1574 expect((signalsOne, signalsTwo))
1575 == (
1576 [|Push(10), Push(20), Push(30)|],
1577 [|Push(10), Push(20), Push(30), Push(40), Push(50), End|],
1578 );
1579 });
1580
1581 it("handles multiple sinks that subscribe and close at different times", () => {
1582 let talkbackOne = ref((. _: Wonka_types.talkbackT) => ());
1583 let talkbackTwo = ref((. _: Wonka_types.talkbackT) => ());
1584 let signalsOne = [||];
1585 let signalsTwo = [||];
1586
1587 let subject = Wonka.makeSubject();
1588
1589 subject.next(10);
1590 subject.next(20);
1591
1592 subject.source((. signal) =>
1593 switch (signal) {
1594 | Start(x) => talkbackOne := x
1595 | Push(_) => ignore(Js.Array.push(signal, signalsOne))
1596 | End => ignore(Js.Array.push(signal, signalsOne))
1597 }
1598 );
1599
1600 subject.next(30);
1601
1602 subject.source((. signal) =>
1603 switch (signal) {
1604 | Start(x) => talkbackTwo := x
1605 | Push(_) => ignore(Js.Array.push(signal, signalsTwo))
1606 | End => ignore(Js.Array.push(signal, signalsTwo))
1607 }
1608 );
1609
1610 subject.next(40);
1611 subject.next(50);
1612
1613 talkbackTwo^(. Close);
1614
1615 subject.next(60);
1616
1617 talkbackOne^(. Close);
1618
1619 subject.next(70);
1620 subject.complete();
1621
1622 expect((signalsOne, signalsTwo))
1623 == (
1624 [|Push(30), Push(40), Push(50), Push(60)|],
1625 [|Push(40), Push(50)|],
1626 );
1627 });
1628});
1629
1630describe("web operators", () => {
1631 describe("delay", () => {
1632 open Expect;
1633 open! Expect.Operators;
1634
1635 afterEach(() => Jest.useRealTimers());
1636
1637 it("should not emit values before specified delay", () => {
1638 Jest.useFakeTimers();
1639 let a = Wonka.fromList([1, 2, 3]);
1640
1641 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1642 let signals = [||];
1643
1644 let source = WonkaJs.delay(200, a) |> Wonka.take(3);
1645
1646 source((. signal) =>
1647 switch (signal) {
1648 | Start(x) =>
1649 talkback := x;
1650 x(. Pull);
1651 | Push(_) =>
1652 ignore(Js.Array.push(signal, signals));
1653 talkback^(. Pull);
1654 | End => ignore(Js.Array.push(signal, signals))
1655 }
1656 );
1657
1658 expect(signals) == [||];
1659 });
1660
1661 it("should emit values after specified delay", () => {
1662 Jest.useFakeTimers();
1663 let a = Wonka.fromList([1, 2, 3]);
1664
1665 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1666 let signals = [||];
1667
1668 let source = WonkaJs.delay(200, a) |> Wonka.take(3);
1669
1670 source((. signal) =>
1671 switch (signal) {
1672 | Start(x) =>
1673 talkback := x;
1674 x(. Pull);
1675 | Push(_) =>
1676 ignore(Js.Array.push(signal, signals));
1677 talkback^(. Pull);
1678 | End => ignore(Js.Array.push(signal, signals))
1679 }
1680 );
1681
1682 Jest.runTimersToTime(400);
1683
1684 expect(signals) == [|Push(1), Push(2)|];
1685 });
1686
1687 it("should emit an End signal when the source has emitted all values", () => {
1688 Jest.useFakeTimers();
1689 let a = Wonka.fromList([1, 2, 3]);
1690
1691 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1692 let signals = [||];
1693
1694 let source = WonkaJs.delay(200, a) |> Wonka.take(3);
1695
1696 source((. signal) =>
1697 switch (signal) {
1698 | Start(x) =>
1699 talkback := x;
1700 x(. Pull);
1701 | Push(_) =>
1702 ignore(Js.Array.push(signal, signals));
1703 talkback^(. Pull);
1704 | End => ignore(Js.Array.push(signal, signals))
1705 }
1706 );
1707
1708 Jest.runTimersToTime(600);
1709
1710 expect(signals) == [|Push(1), Push(2), Push(3), End|];
1711 });
1712 });
1713
1714 describe("throttle", () => {
1715 open Expect;
1716 open! Expect.Operators;
1717
1718 afterEach(() => Jest.useRealTimers());
1719
1720 it(
1721 "should not emit values before specified throttle (but include values on leading edge)",
1722 () => {
1723 Jest.useFakeTimers();
1724 let a = Wonka.interval(100);
1725
1726 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1727 let signals = [||];
1728
1729 let source = WonkaJs.throttle((. _) => 600, a) |> Wonka.take(3);
1730
1731 source((. signal) =>
1732 switch (signal) {
1733 | Start(x) =>
1734 talkback := x;
1735 x(. Pull);
1736 | Push(_) =>
1737 ignore(Js.Array.push(signal, signals));
1738 talkback^(. Pull);
1739 | End => ignore(Js.Array.push(signal, signals))
1740 }
1741 );
1742
1743 Jest.runTimersToTime(400);
1744
1745 expect(signals) == [|Push(0)|];
1746 },
1747 );
1748
1749 it("should throttle emissions by the specified throttle", () => {
1750 Jest.useFakeTimers();
1751 let a = Wonka.interval(100);
1752
1753 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1754 let signals = [||];
1755
1756 let source = WonkaJs.throttle((. _) => 600, a) |> Wonka.take(3);
1757
1758 source((. signal) =>
1759 switch (signal) {
1760 | Start(x) =>
1761 talkback := x;
1762 x(. Pull);
1763 | Push(_) =>
1764 ignore(Js.Array.push(signal, signals));
1765 talkback^(. Pull);
1766 | End => ignore(Js.Array.push(signal, signals))
1767 }
1768 );
1769
1770 Jest.runTimersToTime(1000);
1771
1772 expect(signals) == [|Push(0), Push(7)|];
1773 });
1774
1775 it("should emit an End signal when the source has emitted all values", () => {
1776 Jest.useFakeTimers();
1777 let a = Wonka.interval(100);
1778
1779 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1780 let signals = [||];
1781
1782 let source = WonkaJs.throttle((. _) => 600, a) |> Wonka.take(3);
1783
1784 source((. signal) =>
1785 switch (signal) {
1786 | Start(x) =>
1787 talkback := x;
1788 x(. Pull);
1789 | Push(_) =>
1790 ignore(Js.Array.push(signal, signals));
1791 talkback^(. Pull);
1792 | End => ignore(Js.Array.push(signal, signals))
1793 }
1794 );
1795
1796 Jest.runTimersToTime(1500);
1797
1798 expect(signals) == [|Push(0), Push(7), Push(14), End|];
1799 });
1800 });
1801
1802 describe("debounce", () => {
1803 open Expect;
1804 open! Expect.Operators;
1805
1806 afterEach(() => Jest.useRealTimers());
1807
1808 it(
1809 "should not emit values if emitted before the debounce specified by the duration selector",
1810 () => {
1811 Jest.useFakeTimers();
1812 let a = Wonka.fromList([1, 2, 3]);
1813
1814 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1815 let signals = [||];
1816
1817 let source = WonkaJs.debounce((. _) => 1000, a) |> Wonka.take(3);
1818
1819 source((. signal) =>
1820 switch (signal) {
1821 | Start(x) =>
1822 talkback := x;
1823 x(. Pull);
1824 | Push(_) =>
1825 ignore(Js.Array.push(signal, signals));
1826 talkback^(. Pull);
1827 | End => ignore(Js.Array.push(signal, signals))
1828 }
1829 );
1830
1831 Jest.runTimersToTime(500);
1832
1833 expect(signals) == [||];
1834 },
1835 );
1836
1837 it("should debounce emissions based on the duration selector", () => {
1838 Jest.useFakeTimers();
1839 let a = Wonka.fromList([1, 2, 3]);
1840
1841 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1842 let signals = [||];
1843
1844 let source = WonkaJs.debounce((. _) => 1000, a) |> Wonka.take(3);
1845
1846 source((. signal) =>
1847 switch (signal) {
1848 | Start(x) =>
1849 talkback := x;
1850 x(. Pull);
1851 | Push(_) =>
1852 ignore(Js.Array.push(signal, signals));
1853 talkback^(. Pull);
1854 | End => ignore(Js.Array.push(signal, signals))
1855 }
1856 );
1857
1858 Jest.runTimersToTime(2000);
1859
1860 expect(signals) == [|Push(1), Push(2)|];
1861 });
1862
1863 it("should emit an End signal when the source has emitted all values", () => {
1864 Jest.useFakeTimers();
1865 let a = Wonka.fromList([1, 2, 3]);
1866
1867 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1868 let signals = [||];
1869
1870 let source = WonkaJs.debounce((. _) => 1000, a) |> Wonka.take(3);
1871
1872 source((. signal) =>
1873 switch (signal) {
1874 | Start(x) =>
1875 talkback := x;
1876 x(. Pull);
1877 | Push(_) =>
1878 ignore(Js.Array.push(signal, signals));
1879 talkback^(. Pull);
1880 | End => ignore(Js.Array.push(signal, signals))
1881 }
1882 );
1883
1884 Jest.runTimersToTime(3000);
1885
1886 expect(signals) == [|Push(1), Push(2), Push(3), End|];
1887 });
1888 });
1889
1890 describe("sample", () => {
1891 open Expect;
1892 open! Expect.Operators;
1893
1894 afterEach(() => Jest.useRealTimers());
1895
1896 it("should sample the last emitted value from a source", () => {
1897 Jest.useFakeTimers();
1898 let a = Wonka.interval(50);
1899
1900 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1901 let signals = [||];
1902
1903 let source = WonkaJs.sample(Wonka.interval(100), a);
1904
1905 source((. signal) =>
1906 switch (signal) {
1907 | Start(x) =>
1908 talkback := x;
1909 x(. Pull);
1910 | Push(_) =>
1911 ignore(Js.Array.push(signal, signals));
1912 talkback^(. Pull);
1913 | End => ignore(Js.Array.push(signal, signals))
1914 }
1915 );
1916
1917 Jest.runTimersToTime(200);
1918
1919 expect(signals) == [|Push(1), Push(3)|];
1920 });
1921
1922 it("should emit an End signal when the source has emitted all values", () => {
1923 Jest.useFakeTimers();
1924 let a = Wonka.interval(50);
1925
1926 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1927 let signals = [||];
1928
1929 let source = WonkaJs.sample(Wonka.interval(100), a) |> Wonka.take(3);
1930
1931 source((. signal) =>
1932 switch (signal) {
1933 | Start(x) =>
1934 talkback := x;
1935 x(. Pull);
1936 | Push(_) =>
1937 ignore(Js.Array.push(signal, signals));
1938 talkback^(. Pull);
1939 | End => ignore(Js.Array.push(signal, signals))
1940 }
1941 );
1942
1943 Jest.runTimersToTime(300);
1944
1945 expect(signals) == [|Push(1), Push(3), Push(5), End|];
1946 });
1947 });
1948});