1open Jest;
2open Wonka_types;
3
4let it = test;
5
6describe("source factories", () => {
7 describe("fromList", () => {
8 open Expect;
9 open! Expect.Operators;
10
11 it("sends list items to a puller sink", () => {
12 let source = Wonka.fromList([10, 20, 30]);
13 let talkback = ref((. _: Wonka_types.talkbackT) => ());
14 let signals = [||];
15
16 source((. signal) =>
17 switch (signal) {
18 | Start(x) => talkback := x
19 | Push(_) => ignore(Js.Array.push(signal, signals))
20 | End => ignore(Js.Array.push(signal, signals))
21 }
22 );
23
24 talkback^(. Pull);
25 talkback^(. Pull);
26 talkback^(. Pull);
27 talkback^(. Pull);
28
29 expect(signals) == [|Push(10), Push(20), Push(30), End|];
30 });
31 });
32
33 describe("fromArray", () => {
34 open Expect;
35 open! Expect.Operators;
36
37 it("sends array items to a puller sink", () => {
38 let source = Wonka.fromArray([|10, 20, 30|]);
39 let talkback = ref((. _: Wonka_types.talkbackT) => ());
40 let signals = ref([||]);
41
42 source((. signal) =>
43 switch (signal) {
44 | Start(x) =>
45 talkback := x;
46 x(. Pull);
47 | Push(_) =>
48 signals := Array.append(signals^, [|signal|]);
49 talkback^(. Pull);
50 | End => signals := Array.append(signals^, [|signal|])
51 }
52 );
53
54 expect(signals^) == [|Push(10), Push(20), Push(30), End|];
55 });
56
57 it("does not blow up the stack when iterating something huge", () => {
58 let arr = Array.make(100000, 123);
59 let source = Wonka.fromArray(arr);
60 let talkback = ref((. _: Wonka_types.talkbackT) => ());
61 let values = [||];
62
63 source((. signal) =>
64 switch (signal) {
65 | Start(x) =>
66 talkback := x;
67 x(. Pull);
68 | Push(x) =>
69 ignore(Js.Array.push(x, values));
70 talkback^(. Pull);
71 | End => ()
72 }
73 );
74
75 expect(Array.length(values)) == Array.length(arr);
76 });
77 });
78
79 describe("fromValue", () => {
80 open Expect;
81 open! Expect.Operators;
82
83 it("sends a single item to a puller sink", () => {
84 let source = Wonka.fromValue(123);
85 let talkback = ref((. _: Wonka_types.talkbackT) => ());
86 let signals = [||];
87
88 source((. signal) =>
89 switch (signal) {
90 | Start(x) => talkback := x
91 | Push(_) => ignore(Js.Array.push(signal, signals))
92 | End => ignore(Js.Array.push(signal, signals))
93 }
94 );
95
96 talkback^(. Pull);
97 talkback^(. Pull);
98 talkback^(. Pull); /* one extra to check whether no signal comes back after it has ended */
99
100 expect(signals) == [|Push(123), End|];
101 });
102 });
103
104 describe("empty", () => {
105 open Expect;
106 open! Expect.Operators;
107
108 it("ends immediately", () => {
109 let talkback = ref((. _: Wonka_types.talkbackT) => ());
110 let signals = [||];
111
112 Wonka.empty((. signal) =>
113 switch (signal) {
114 | Start(x) => talkback := x
115 | _ => ignore(Js.Array.push(signal, signals))
116 }
117 );
118
119 let _signals = Array.copy(signals);
120
121 talkback^(. Pull);
122 talkback^(. Pull);
123
124 expect((_signals, signals)) == ([|End|], [|End|]);
125 });
126 });
127
128 describe("never", () => {
129 open Expect;
130 open! Expect.Operators;
131
132 it("does not end", () => {
133 let talkback = ref((. _: Wonka_types.talkbackT) => ());
134 let ended = ref(false);
135
136 Wonka.never((. signal) =>
137 switch (signal) {
138 | Start(x) => talkback := x
139 | End => ended := true
140 | _ => ()
141 }
142 );
143
144 talkback^(. Pull);
145 talkback^(. Pull);
146
147 expect(ended^) === false;
148 });
149 });
150
151 describe("fromObservable", () =>
152 Expect.(
153 testPromise("creates a source from an observable", () => {
154 let observable = Wonka_thelpers.observableFromArray([|1, 2|]);
155
156 Wonka_thelpers.testSource(Wonka.fromObservable(observable))
157 |> Js.Promise.then_(x =>
158 expect(x)
159 |> toEqual([|Push(1), Push(2), End|])
160 |> Js.Promise.resolve
161 );
162 })
163 )
164 );
165
166 describe("fromCallbag", () => {
167 open Expect;
168
169 testPromise("creates a source from a callbag (observable)", () => {
170 let observable = Wonka_thelpers.observableFromArray([|1, 2|]);
171 let callbag = Wonka_thelpers.callbagFromObservable(observable);
172
173 Wonka_thelpers.testSource(Wonka.fromCallbag(callbag))
174 |> Js.Promise.then_(x =>
175 expect(x)
176 |> toEqual([|Push(1), Push(2), End|])
177 |> Js.Promise.resolve
178 );
179 });
180
181 testPromise("creates a source from a callbag (iterable)", () => {
182 let callbag = Wonka_thelpers.callbagFromArray([|1, 2|]);
183
184 Wonka_thelpers.testSource(Wonka.fromCallbag(callbag))
185 |> Js.Promise.then_(x =>
186 expect(x)
187 |> toEqual([|Push(1), Push(2), End|])
188 |> Js.Promise.resolve
189 );
190 });
191 });
192});
193
194describe("operator factories", () => {
195 describe("map", () => {
196 open Expect;
197
198 it("maps all emissions of a source", () => {
199 let num = ref(1);
200 let nums = [||];
201 let talkback = ref((. _: Wonka_types.talkbackT) => ());
202
203 Wonka.map(
204 (. _) => {
205 let res = num^;
206 num := num^ + 1;
207 res;
208 },
209 sink =>
210 sink(.
211 Start(
212 (. signal) =>
213 switch (signal) {
214 | Pull => sink(. Push(1))
215 | _ => ()
216 },
217 ),
218 ),
219 (. signal) =>
220 switch (signal) {
221 | Start(x) =>
222 talkback := x;
223 x(. Pull);
224 | Push(x) when num^ < 6 =>
225 ignore(Js.Array.push(x, nums));
226 talkback^(. Pull);
227 | _ => ()
228 },
229 );
230
231 expect(nums) |> toEqual([|1, 2, 3, 4|]);
232 });
233
234 testPromise("follows the spec for listenables", () =>
235 Wonka_thelpers.testWithListenable(Wonka.map((. x) => x))
236 |> Js.Promise.then_(x =>
237 expect(x)
238 |> toEqual(([||], [|Push(1), Push(2), End|]))
239 |> Js.Promise.resolve
240 )
241 );
242
243 testPromise(
244 "ends itself and source when its talkback receives the End signal", () => {
245 let end_: talkbackT = Close;
246
247 Wonka_thelpers.testTalkbackEnd(Wonka.map((. x) => x))
248 |> Js.Promise.then_(x =>
249 expect(x)
250 |> toEqual(([|end_|], [|Push(1)|]))
251 |> Js.Promise.resolve
252 );
253 });
254 });
255
256 describe("filter", () => {
257 open Expect;
258
259 it("filters emissions according to a predicate", () => {
260 let i = ref(1);
261 let nums = [||];
262 let talkback = ref((. _: Wonka_types.talkbackT) => ());
263
264 Wonka.filter(
265 (. x) => x mod 2 === 0,
266 sink =>
267 sink(.
268 Start(
269 (. signal) =>
270 switch (signal) {
271 | Pull =>
272 let num = i^;
273 i := i^ + 1;
274 sink(. Push(num));
275 | _ => ()
276 },
277 ),
278 ),
279 (. signal) =>
280 switch (signal) {
281 | Start(x) =>
282 talkback := x;
283 x(. Pull);
284 | Push(x) when x < 6 =>
285 ignore(Js.Array.push(x, nums));
286 talkback^(. Pull);
287 | _ => ()
288 },
289 );
290
291 expect(nums) |> toEqual([|2, 4|]);
292 });
293
294 testPromise("follows the spec for listenables", () =>
295 Wonka_thelpers.testWithListenable(Wonka.filter((. _) => true))
296 |> Js.Promise.then_(x =>
297 expect(x)
298 |> toEqual(([||], [|Push(1), Push(2), End|]))
299 |> Js.Promise.resolve
300 )
301 );
302
303 testPromise("follows the spec for listenables when filtering", () =>
304 Wonka_thelpers.testWithListenable(Wonka.filter((. _) => false))
305 |> Js.Promise.then_(x =>
306 expect(x)
307 |> toEqual(([|Pull, Pull|], [|End|]))
308 |> Js.Promise.resolve
309 )
310 );
311
312 testPromise(
313 "ends itself and source when its talkback receives the End signal", () => {
314 let end_: talkbackT = Close;
315
316 Wonka_thelpers.testTalkbackEnd(Wonka.filter((. _) => true))
317 |> Js.Promise.then_(x =>
318 expect(x)
319 |> toEqual(([|end_|], [|Push(1)|]))
320 |> Js.Promise.resolve
321 );
322 });
323 });
324
325 describe("sample", () => {
326 open Expect;
327 open! Expect.Operators;
328
329 afterEach(() => Jest.useRealTimers());
330
331 it("should sample the last emitted value from a source", () => {
332 Jest.useFakeTimers();
333 let a = Wonka.interval(50);
334
335 let talkback = ref((. _: Wonka_types.talkbackT) => ());
336 let signals = [||];
337
338 let source = Wonka.sample(Wonka.interval(100), a);
339
340 source((. signal) =>
341 switch (signal) {
342 | Start(x) =>
343 talkback := x;
344 x(. Pull);
345 | Push(_) =>
346 ignore(Js.Array.push(signal, signals));
347 talkback^(. Pull);
348 | End => ignore(Js.Array.push(signal, signals))
349 }
350 );
351
352 Jest.runTimersToTime(200);
353
354 expect(signals) == [|Push(1), Push(3)|];
355 });
356
357 it("should emit an End signal when the source has emitted all values", () => {
358 Jest.useFakeTimers();
359 let a = Wonka.interval(50);
360
361 let talkback = ref((. _: Wonka_types.talkbackT) => ());
362 let signals = [||];
363
364 let source = Wonka.sample(Wonka.interval(100), a) |> Wonka.take(3);
365
366 source((. signal) =>
367 switch (signal) {
368 | Start(x) =>
369 talkback := x;
370 x(. Pull);
371 | Push(_) =>
372 ignore(Js.Array.push(signal, signals));
373 talkback^(. Pull);
374 | End => ignore(Js.Array.push(signal, signals))
375 }
376 );
377
378 Jest.runTimersToTime(300);
379
380 expect(signals) == [|Push(1), Push(3), Push(5), End|];
381 });
382 });
383
384 describe("scan", () => {
385 open Expect;
386
387 it("folds emissions using an initial seed value", () => {
388 let talkback = ref((. _: Wonka_types.talkbackT) => ());
389 let num = ref(1);
390
391 let source =
392 Wonka.scan(
393 (. acc, x) => acc + x,
394 0,
395 sink =>
396 sink(.
397 Start(
398 (. signal) =>
399 switch (signal) {
400 | Pull =>
401 let i = num^;
402 if (i <= 3) {
403 num := num^ + 1;
404 sink(. Push(i));
405 } else {
406 sink(. End);
407 };
408 | _ => ()
409 },
410 ),
411 ),
412 );
413
414 let res = [||];
415
416 source((. signal) =>
417 switch (signal) {
418 | Start(x) => talkback := x
419 | _ => ignore(Js.Array.push(signal, res))
420 }
421 );
422
423 talkback^(. Pull);
424 talkback^(. Pull);
425 talkback^(. Pull);
426 talkback^(. Pull);
427 expect(res) |> toEqual([|Push(1), Push(3), Push(6), End|]);
428 });
429
430 testPromise("follows the spec for listenables", () =>
431 Wonka_thelpers.testWithListenable(Wonka.scan((. _, x) => x, 0))
432 |> Js.Promise.then_(x =>
433 expect(x)
434 |> toEqual(([||], [|Push(1), Push(2), End|]))
435 |> Js.Promise.resolve
436 )
437 );
438
439 testPromise(
440 "ends itself and source when its talkback receives the End signal", () => {
441 let end_: talkbackT = Close;
442
443 Wonka_thelpers.testTalkbackEnd(Wonka.scan((. _, x) => x, 0))
444 |> Js.Promise.then_(x =>
445 expect(x)
446 |> toEqual(([|end_|], [|Push(1)|]))
447 |> Js.Promise.resolve
448 );
449 });
450 });
451
452 describe("merge", () => {
453 open Expect;
454 open! Expect.Operators;
455
456 it("merges different sources into a single one", () => {
457 let a = Wonka.fromList([1, 2, 3]);
458 let b = Wonka.fromList([4, 5, 6]);
459 let talkback = ref((. _: Wonka_types.talkbackT) => ());
460 let signals = [||];
461 let source = Wonka.merge([|a, b|]);
462
463 source((. signal) =>
464 switch (signal) {
465 | Start(x) =>
466 talkback := x;
467 x(. Pull);
468 | Push(_) =>
469 ignore(Js.Array.push(signal, signals));
470 talkback^(. Pull);
471 | End => ignore(Js.Array.push(signal, signals))
472 }
473 );
474
475 expect(signals)
476 == [|Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End|];
477 });
478
479 testPromise("follows the spec for listenables", () =>
480 Wonka_thelpers.testWithListenable(source => Wonka.merge([|source|]))
481 |> Js.Promise.then_(x =>
482 expect(x)
483 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
484 |> Js.Promise.resolve
485 )
486 );
487
488 testPromise(
489 "ends itself and source when its talkback receives the End signal", () =>
490 Wonka_thelpers.testTalkbackEnd(source => Wonka.merge([|source|]))
491 |> Js.Promise.then_(x =>
492 expect(x)
493 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
494 |> Js.Promise.resolve
495 )
496 );
497 });
498
499 describe("concat", () => {
500 open Expect;
501 open! Expect.Operators;
502
503 it("concatenates different sources into a single one", () => {
504 let a = Wonka.fromList([1, 2, 3]);
505 let b = Wonka.fromList([4, 5, 6]);
506 let talkback = ref((. _: Wonka_types.talkbackT) => ());
507 let signals = [||];
508 let source = Wonka.concat([|a, b|]);
509
510 source((. signal) =>
511 switch (signal) {
512 | Start(x) =>
513 talkback := x;
514 x(. Pull);
515 | Push(_) =>
516 ignore(Js.Array.push(signal, signals));
517 talkback^(. Pull);
518 | End => ignore(Js.Array.push(signal, signals))
519 }
520 );
521
522 expect(signals)
523 == [|Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End|];
524 });
525
526 testPromise("follows the spec for listenables", () =>
527 Wonka_thelpers.testWithListenable(source => Wonka.concat([|source|]))
528 |> Js.Promise.then_(x =>
529 expect(x)
530 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
531 |> Js.Promise.resolve
532 )
533 );
534
535 testPromise(
536 "ends itself and source when its talkback receives the End signal", () =>
537 Wonka_thelpers.testTalkbackEnd(source => Wonka.concat([|source|]))
538 |> Js.Promise.then_(x =>
539 expect(x)
540 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
541 |> Js.Promise.resolve
542 )
543 );
544 });
545
546 describe("share", () => {
547 open Expect;
548
549 it("shares an underlying source with all sinks", () => {
550 let talkback = ref((. _: Wonka_types.talkbackT) => ());
551 let aborterTb = ref((. _: Wonka_types.talkbackT) => ());
552 let num = ref(1);
553 let nums = [||];
554
555 let source =
556 Wonka.share(sink =>
557 sink(.
558 Start(
559 (. signal) =>
560 switch (signal) {
561 | Pull =>
562 let i = num^;
563 if (i <= 2) {
564 num := num^ + 1;
565 sink(. Push(i));
566 } else {
567 sink(. End);
568 };
569 | _ => ()
570 },
571 ),
572 )
573 );
574
575 source((. signal) =>
576 switch (signal) {
577 | Start(x) => talkback := x
578 | _ => ignore(Js.Array.push(signal, nums))
579 }
580 );
581
582 source((. signal) =>
583 switch (signal) {
584 | Start(_) => ()
585 | _ => ignore(Js.Array.push(signal, nums))
586 }
587 );
588
589 source((. signal) =>
590 switch (signal) {
591 | Start(tb) => aborterTb := tb
592 | _ =>
593 ignore(Js.Array.push(signal, nums));
594 aborterTb^(. Close);
595 }
596 );
597
598 talkback^(. Pull);
599 let numsA = Array.copy(nums);
600 talkback^(. Pull);
601 talkback^(. Pull);
602 talkback^(. Pull);
603 expect((numsA, nums))
604 |> toEqual((
605 [|Push(1), Push(1), Push(1)|],
606 [|Push(1), Push(1), Push(1), Push(2), Push(2), End, End|],
607 ));
608 });
609
610 testPromise("follows the spec for listenables", () =>
611 Wonka_thelpers.testWithListenable(Wonka.share)
612 |> Js.Promise.then_(x =>
613 expect(x)
614 |> toEqual(([||], [|Push(1), Push(2), End|]))
615 |> Js.Promise.resolve
616 )
617 );
618
619 testPromise(
620 "ends itself and source when its talkback receives the End signal", () => {
621 let end_: talkbackT = Close;
622
623 Wonka_thelpers.testTalkbackEnd(Wonka.share)
624 |> Js.Promise.then_(x =>
625 expect(x)
626 |> toEqual(([|end_|], [|Push(1)|]))
627 |> Js.Promise.resolve
628 );
629 });
630 });
631
632 describe("combine", () => {
633 open Expect;
634
635 it("combines the latest values of two sources", () => {
636 let talkback = ref((. _: Wonka_types.talkbackT) => ());
637
638 let makeSource = (factor: int) => {
639 let num = ref(1);
640
641 sink => {
642 sink(.
643 Start(
644 (. signal) =>
645 switch (signal) {
646 | Pull =>
647 if (num^ <= 2) {
648 let i = num^ * factor;
649 num := num^ + 1;
650 sink(. Push(i));
651 } else {
652 sink(. End);
653 }
654 | _ => ()
655 },
656 ),
657 );
658 };
659 };
660
661 let sourceA = makeSource(1);
662 let sourceB = makeSource(2);
663 let source = Wonka.combine(sourceA, sourceB);
664 let res = [||];
665
666 source((. signal) =>
667 switch (signal) {
668 | Start(x) => talkback := x
669 | _ => ignore(Js.Array.push(signal, res))
670 }
671 );
672
673 talkback^(. Pull);
674 talkback^(. Pull);
675 talkback^(. Pull);
676 talkback^(. Pull);
677 expect(res)
678 |> toEqual([|Push((1, 2)), Push((2, 2)), Push((2, 4)), End|]);
679 });
680
681 testPromise("follows the spec for listenables", () =>
682 Wonka_thelpers.testWithListenable(source => {
683 let shared = Wonka.share(source);
684 Wonka.combine(shared, shared);
685 })
686 |> Js.Promise.then_(x =>
687 expect(x)
688 |> toEqual((
689 [||],
690 [|Push((1, 1)), Push((2, 1)), Push((2, 2)), End|],
691 ))
692 |> Js.Promise.resolve
693 )
694 );
695
696 testPromise(
697 "ends itself and source when its talkback receives the End signal", () => {
698 let end_: talkbackT = Close;
699
700 Wonka_thelpers.testTalkbackEnd(source => {
701 let shared = Wonka.share(source);
702 Wonka.combine(shared, shared);
703 })
704 |> Js.Promise.then_(x =>
705 expect(x)
706 |> toEqual(([|end_|], [|Push((1, 1))|]))
707 |> Js.Promise.resolve
708 );
709 });
710 });
711
712 describe("take", () => {
713 open Expect;
714
715 it("only lets a maximum number of values through", () => {
716 let talkback = ref((. _: Wonka_types.talkbackT) => ());
717 let num = ref(1);
718
719 let source =
720 Wonka.take(2, sink =>
721 sink(.
722 Start(
723 (. signal) =>
724 switch (signal) {
725 | Pull =>
726 let i = num^;
727 num := num^ + 1;
728 sink(. Push(i));
729 | _ => ()
730 },
731 ),
732 )
733 );
734
735 let res = [||];
736
737 source((. signal) =>
738 switch (signal) {
739 | Start(x) => talkback := x
740 | _ => ignore(Js.Array.push(signal, res))
741 }
742 );
743
744 talkback^(. Pull);
745 talkback^(. Pull);
746 talkback^(. Pull);
747 talkback^(. Pull);
748 expect(res) |> toEqual([|Push(1), Push(2), End|]);
749 });
750
751 it(
752 "accepts the end of the source when max number of emissions is not reached",
753 () => {
754 let talkback = ref((. _: Wonka_types.talkbackT) => ());
755 let num = ref(1);
756
757 let source =
758 Wonka.take(2, sink =>
759 sink(.
760 Start(
761 (. signal) =>
762 switch (signal) {
763 | Pull =>
764 let i = num^;
765 if (i < 2) {
766 num := num^ + 1;
767 sink(. Push(i));
768 } else {
769 sink(. End);
770 };
771 | _ => ()
772 },
773 ),
774 )
775 );
776
777 let res = [||];
778
779 source((. signal) =>
780 switch (signal) {
781 | Start(x) => talkback := x
782 | _ => ignore(Js.Array.push(signal, res))
783 }
784 );
785
786 talkback^(. Pull);
787 talkback^(. Pull);
788 talkback^(. Pull);
789 expect(res) |> toEqual([|Push(1), End|]);
790 });
791
792 testPromise("follows the spec for listenables", () =>
793 Wonka_thelpers.testWithListenable(Wonka.take(10))
794 |> Js.Promise.then_(x =>
795 expect(x)
796 |> toEqual(([||], [|Push(1), Push(2), End|]))
797 |> Js.Promise.resolve
798 )
799 );
800
801 testPromise("follows the spec for listenables when ending the source", () => {
802 let end_: talkbackT = Close;
803
804 Wonka_thelpers.testWithListenable(Wonka.take(1))
805 |> Js.Promise.then_(x =>
806 expect(x)
807 |> toEqual(([|end_|], [|Push(1), End|]))
808 |> Js.Promise.resolve
809 );
810 });
811
812 testPromise(
813 "ends itself and source when its talkback receives the End signal", () => {
814 let end_: talkbackT = Close;
815
816 Wonka_thelpers.testTalkbackEnd(Wonka.take(10))
817 |> Js.Promise.then_(x =>
818 expect(x)
819 |> toEqual(([|end_|], [|Push(1)|]))
820 |> Js.Promise.resolve
821 );
822 });
823 });
824
825 describe("takeLast", () => {
826 open Expect;
827
828 it("only lets the last n values through on an entirely new source", () => {
829 let talkback = ref((. _: Wonka_types.talkbackT) => ());
830 let num = ref(1);
831
832 let source =
833 Wonka.takeLast(2, sink =>
834 sink(.
835 Start(
836 (. signal) =>
837 switch (signal) {
838 | Pull when num^ <= 4 =>
839 let i = num^;
840 num := num^ + 1;
841 sink(. Push(i));
842 | Pull => sink(. End)
843 | _ => ()
844 },
845 ),
846 )
847 );
848
849 let res = [||];
850
851 source((. signal) =>
852 switch (signal) {
853 | Start(x) => talkback := x
854 | _ => ignore(Js.Array.push(signal, res))
855 }
856 );
857
858 talkback^(. Pull);
859 talkback^(. Pull);
860 talkback^(. Pull);
861 expect(res) |> toEqual([|Push(3), Push(4), End|]);
862 });
863
864 testPromise("follows the spec for listenables", () =>
865 Wonka_thelpers.testWithListenable(Wonka.takeLast(10))
866 |> Js.Promise.then_(x =>
867 expect(x)
868 |> toEqual((
869 [|Pull, Pull, Pull|],
870 [|/* empty since the source is a pullable */|],
871 ))
872 |> Js.Promise.resolve
873 )
874 );
875
876 testPromise(
877 "ends itself and source when its talkback receives the End signal", () =>
878 Wonka_thelpers.testTalkbackEnd(Wonka.takeLast(10))
879 |> Js.Promise.then_(x =>
880 expect(x)
881 |> toEqual(([|Pull, Pull|], [||]))
882 |> Js.Promise.resolve
883 )
884 );
885 });
886
887 describe("takeWhile", () => {
888 open Expect;
889
890 it("only lets the last n values through on an entirely new source", () => {
891 let talkback = ref((. _: Wonka_types.talkbackT) => ());
892 let num = ref(1);
893
894 let source =
895 Wonka.takeWhile(
896 (. x) => x <= 2,
897 sink =>
898 sink(.
899 Start(
900 (. signal) =>
901 switch (signal) {
902 | Pull =>
903 let i = num^;
904 num := num^ + 1;
905 sink(. Push(i));
906 | _ => ()
907 },
908 ),
909 ),
910 );
911
912 let res = [||];
913
914 source((. signal) =>
915 switch (signal) {
916 | Start(x) => talkback := x
917 | _ => ignore(Js.Array.push(signal, res))
918 }
919 );
920
921 talkback^(. Pull);
922 talkback^(. Pull);
923 talkback^(. Pull);
924 talkback^(. Pull);
925
926 expect(res) |> toEqual([|Push(1), Push(2), End|]);
927 });
928
929 it(
930 "accepts the end of the source when max number of emissions is not reached",
931 () => {
932 let talkback = ref((. _: Wonka_types.talkbackT) => ());
933 let num = ref(1);
934
935 let source =
936 Wonka.takeWhile(
937 (. x) => x <= 5,
938 sink =>
939 sink(.
940 Start(
941 (. signal) =>
942 switch (signal) {
943 | Pull =>
944 let i = num^;
945 if (i < 2) {
946 num := num^ + 1;
947 sink(. Push(i));
948 } else {
949 sink(. End);
950 };
951 | _ => ()
952 },
953 ),
954 ),
955 );
956
957 let res = [||];
958
959 source((. signal) =>
960 switch (signal) {
961 | Start(x) => talkback := x
962 | _ => ignore(Js.Array.push(signal, res))
963 }
964 );
965
966 talkback^(. Pull);
967 talkback^(. Pull);
968 talkback^(. Pull);
969
970 expect(res) |> toEqual([|Push(1), End|]);
971 });
972
973 testPromise("follows the spec for listenables", () =>
974 Wonka_thelpers.testWithListenable(Wonka.takeWhile((. _) => true))
975 |> Js.Promise.then_(x =>
976 expect(x)
977 |> toEqual(([||], [|Push(1), Push(2), End|]))
978 |> Js.Promise.resolve
979 )
980 );
981
982 testPromise("follows the spec for listenables when ending the source", () => {
983 let end_: talkbackT = Close;
984
985 Wonka_thelpers.testWithListenable(Wonka.takeWhile((. _) => false))
986 |> Js.Promise.then_(x =>
987 expect(x) |> toEqual(([|end_|], [|End|])) |> Js.Promise.resolve
988 );
989 });
990
991 testPromise(
992 "ends itself and source when its talkback receives the End signal", () => {
993 let end_: talkbackT = Close;
994
995 Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((. _) => true))
996 |> Js.Promise.then_(x =>
997 expect(x)
998 |> toEqual(([|end_|], [|Push(1)|]))
999 |> Js.Promise.resolve
1000 );
1001 });
1002 });
1003
1004 describe("takeUntil", () => {
1005 open Expect;
1006
1007 it("only lets the last n values through on an entirely new source", () => {
1008 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1009 let notify = ref((_: Wonka_types.talkbackT) => ());
1010 let num = ref(1);
1011
1012 let notifier = sink => {
1013 notify :=
1014 (
1015 signal =>
1016 switch (signal) {
1017 | Pull => sink(. Push(0))
1018 | _ => ()
1019 }
1020 );
1021
1022 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1023 };
1024
1025 let source =
1026 Wonka.takeUntil(notifier, sink =>
1027 sink(.
1028 Start(
1029 (. signal) =>
1030 switch (signal) {
1031 | Pull when num^ <= 4 =>
1032 let i = num^;
1033 if (i === 3) {
1034 notify^(Pull);
1035 };
1036 num := num^ + 1;
1037 sink(. Push(i));
1038 | _ => ()
1039 },
1040 ),
1041 )
1042 );
1043
1044 let res = [||];
1045
1046 source((. signal) =>
1047 switch (signal) {
1048 | Start(x) => talkback := x
1049 | _ => ignore(Js.Array.push(signal, res))
1050 }
1051 );
1052
1053 talkback^(. Pull);
1054 talkback^(. Pull);
1055 talkback^(. Pull);
1056 talkback^(. Pull);
1057
1058 expect(res) |> toEqual([|Push(1), Push(2), End|]);
1059 });
1060
1061 it(
1062 "accepts the end of the source when max number of emissions is not reached",
1063 () => {
1064 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1065 let num = ref(1);
1066 let notifier = sink =>
1067 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1068
1069 let source =
1070 Wonka.takeUntil(notifier, sink =>
1071 sink(.
1072 Start(
1073 (. signal) =>
1074 switch (signal) {
1075 | Pull =>
1076 let i = num^;
1077 if (num^ <= 2) {
1078 num := num^ + 1;
1079 sink(. Push(i));
1080 } else {
1081 sink(. End);
1082 };
1083 | _ => ()
1084 },
1085 ),
1086 )
1087 );
1088
1089 let res = [||];
1090
1091 source((. signal) =>
1092 switch (signal) {
1093 | Start(x) => talkback := x
1094 | _ => ignore(Js.Array.push(signal, res))
1095 }
1096 );
1097
1098 talkback^(. Pull);
1099 talkback^(. Pull);
1100 talkback^(. Pull);
1101 talkback^(. Pull);
1102
1103 expect(res) |> toEqual([|Push(1), Push(2), End|]);
1104 });
1105
1106 testPromise("follows the spec for listenables", () =>
1107 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.never))
1108 |> Js.Promise.then_(x =>
1109 expect(x)
1110 |> toEqual(([||], [|Push(1), Push(2), End|]))
1111 |> Js.Promise.resolve
1112 )
1113 );
1114
1115 testPromise("follows the spec for listenables when ending the source", () => {
1116 let end_: talkbackT = Close;
1117
1118 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.fromValue(0)))
1119 |> Js.Promise.then_(x =>
1120 expect(x) |> toEqual(([|end_|], [|End|])) |> Js.Promise.resolve
1121 );
1122 });
1123
1124 testPromise(
1125 "ends itself and source when its talkback receives the End signal", () => {
1126 let end_: talkbackT = Close;
1127
1128 Wonka_thelpers.testTalkbackEnd(Wonka.takeUntil(Wonka.never))
1129 |> Js.Promise.then_(x =>
1130 expect(x)
1131 |> toEqual(([|end_|], [|Push(1)|]))
1132 |> Js.Promise.resolve
1133 );
1134 });
1135 });
1136
1137 describe("skip", () => {
1138 open Expect;
1139
1140 it(
1141 "only lets values through after a number of values have been filtered out",
1142 () => {
1143 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1144 let num = ref(1);
1145
1146 let source =
1147 Wonka.skip(2, sink =>
1148 sink(.
1149 Start(
1150 (. signal) =>
1151 switch (signal) {
1152 | Pull when num^ <= 4 =>
1153 let i = num^;
1154 num := num^ + 1;
1155 sink(. Push(i));
1156 | Pull => sink(. End)
1157 | _ => ()
1158 },
1159 ),
1160 )
1161 );
1162
1163 let res = [||];
1164
1165 source((. signal) =>
1166 switch (signal) {
1167 | Start(x) => talkback := x
1168 | _ => ignore(Js.Array.push(signal, res))
1169 }
1170 );
1171
1172 talkback^(. Pull);
1173 talkback^(. Pull);
1174 talkback^(. Pull);
1175 expect(res) |> toEqual([|Push(3), Push(4), End|]);
1176 });
1177
1178 testPromise("follows the spec for listenables", () =>
1179 Wonka_thelpers.testWithListenable(Wonka.skip(0))
1180 |> Js.Promise.then_(x =>
1181 expect(x)
1182 |> toEqual(([||], [|Push(1), Push(2), End|]))
1183 |> Js.Promise.resolve
1184 )
1185 );
1186
1187 testPromise(
1188 "follows the spec for listenables when skipping the source", () =>
1189 Wonka_thelpers.testWithListenable(Wonka.skip(10))
1190 |> Js.Promise.then_(x =>
1191 expect(x)
1192 |> toEqual(([|Pull, Pull|], [|End|]))
1193 |> Js.Promise.resolve
1194 )
1195 );
1196
1197 testPromise(
1198 "ends itself and source when its talkback receives the End signal", () => {
1199 let end_: talkbackT = Close;
1200
1201 Wonka_thelpers.testTalkbackEnd(Wonka.skip(10))
1202 |> Js.Promise.then_(x =>
1203 expect(x)
1204 |> toEqual(([|Pull, end_|], [||]))
1205 |> Js.Promise.resolve
1206 );
1207 });
1208 });
1209
1210 describe("skipWhile", () => {
1211 open Expect;
1212
1213 it(
1214 "only lets values through after the predicate returned false, including the first such value",
1215 () => {
1216 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1217 let num = ref(1);
1218
1219 let source =
1220 Wonka.skipWhile(
1221 (. x) => x <= 2,
1222 sink =>
1223 sink(.
1224 Start(
1225 (. signal) =>
1226 switch (signal) {
1227 | Pull when num^ <= 4 =>
1228 let i = num^;
1229 num := num^ + 1;
1230 sink(. Push(i));
1231 | Pull => sink(. End)
1232 | _ => ()
1233 },
1234 ),
1235 ),
1236 );
1237
1238 let res = [||];
1239
1240 source((. signal) =>
1241 switch (signal) {
1242 | Start(x) => talkback := x
1243 | _ => ignore(Js.Array.push(signal, res))
1244 }
1245 );
1246
1247 talkback^(. Pull);
1248 talkback^(. Pull);
1249 talkback^(. Pull);
1250 expect(res) |> toEqual([|Push(3), Push(4), End|]);
1251 },
1252 );
1253
1254 testPromise("follows the spec for listenables", () =>
1255 Wonka_thelpers.testWithListenable(Wonka.skipWhile((. _) => false))
1256 |> Js.Promise.then_(x =>
1257 expect(x)
1258 |> toEqual(([||], [|Push(1), Push(2), End|]))
1259 |> Js.Promise.resolve
1260 )
1261 );
1262
1263 testPromise(
1264 "follows the spec for listenables when skipping the source", () =>
1265 Wonka_thelpers.testWithListenable(Wonka.skipWhile((. _) => true))
1266 |> Js.Promise.then_(x =>
1267 expect(x)
1268 |> toEqual(([|Pull, Pull|], [|End|]))
1269 |> Js.Promise.resolve
1270 )
1271 );
1272
1273 testPromise(
1274 "ends itself and source when its talkback receives the End signal", () => {
1275 let end_: talkbackT = Close;
1276
1277 Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((. _) => false))
1278 |> Js.Promise.then_(x =>
1279 expect(x)
1280 |> toEqual(([|end_|], [|Push(1)|]))
1281 |> Js.Promise.resolve
1282 );
1283 });
1284 });
1285
1286 describe("skipUntil", () => {
1287 open Expect;
1288
1289 it("only lets values through after the notifier emits a value", () => {
1290 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1291 let notify = ref((_: Wonka_types.talkbackT) => ());
1292 let num = ref(1);
1293
1294 let notifier = sink => {
1295 notify :=
1296 (
1297 signal =>
1298 switch (signal) {
1299 | Pull => sink(. Push(0))
1300 | _ => ()
1301 }
1302 );
1303
1304 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1305 };
1306
1307 let source =
1308 Wonka.skipUntil(notifier, sink =>
1309 sink(.
1310 Start(
1311 (. signal) =>
1312 switch (signal) {
1313 | Pull when num^ <= 4 =>
1314 let i = num^;
1315 if (i === 3) {
1316 notify^(Pull);
1317 };
1318 num := num^ + 1;
1319 sink(. Push(i));
1320 | Pull => sink(. End)
1321 | _ => ()
1322 },
1323 ),
1324 )
1325 );
1326
1327 let res = [||];
1328
1329 source((. signal) =>
1330 switch (signal) {
1331 | Start(x) => talkback := x
1332 | _ => ignore(Js.Array.push(signal, res))
1333 }
1334 );
1335
1336 talkback^(. Pull);
1337 talkback^(. Pull);
1338 talkback^(. Pull);
1339 talkback^(. Pull);
1340
1341 expect(res) |> toEqual([|Push(3), Push(4), End|]);
1342 });
1343
1344 it(
1345 "accepts the end of the source when max number of emissions is not reached",
1346 () => {
1347 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1348 let num = ref(1);
1349 let notifier = sink =>
1350 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1351
1352 let source =
1353 Wonka.skipUntil(notifier, sink =>
1354 sink(.
1355 Start(
1356 (. signal) =>
1357 switch (signal) {
1358 | Pull =>
1359 let i = num^;
1360 if (i < 2) {
1361 num := num^ + 1;
1362 sink(. Push(i));
1363 } else {
1364 sink(. End);
1365 };
1366 | _ => ()
1367 },
1368 ),
1369 )
1370 );
1371
1372 let res = [||];
1373
1374 source((. signal) =>
1375 switch (signal) {
1376 | Start(x) => talkback := x
1377 | _ => ignore(Js.Array.push(signal, res))
1378 }
1379 );
1380
1381 talkback^(. Pull);
1382 talkback^(. Pull);
1383 talkback^(. Pull);
1384
1385 expect(res) |> toEqual([|End|]);
1386 });
1387
1388 testPromise("follows the spec for listenables", () =>
1389 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.never))
1390 |> Js.Promise.then_(x =>
1391 expect(x)
1392 |> toEqual(([|Pull, Pull, Pull|], [|End|]))
1393 |> Js.Promise.resolve
1394 )
1395 );
1396
1397 testPromise(
1398 "follows the spec for listenables when skipping the source", () =>
1399 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.fromValue(0)))
1400 |> Js.Promise.then_(x =>
1401 expect(x)
1402 |> toEqual(([|Pull|], [|Push(1), Push(2), End|]))
1403 |> Js.Promise.resolve
1404 )
1405 );
1406
1407 testPromise(
1408 "ends itself and source when its talkback receives the End signal", () => {
1409 let end_: talkbackT = Close;
1410
1411 Wonka_thelpers.testTalkbackEnd(Wonka.skipUntil(Wonka.fromValue(0)))
1412 |> Js.Promise.then_(x =>
1413 expect(x)
1414 |> toEqual(([|Pull, end_|], [|Push(1)|]))
1415 |> Js.Promise.resolve
1416 );
1417 });
1418 });
1419
1420 describe("flatten", () =>
1421 Expect.(
1422 it("merges the result of multiple pullables into its source", () => {
1423 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1424 let source =
1425 Wonka.fromList([Wonka.fromList([1, 2]), Wonka.fromList([1, 2])])
1426 |> Wonka.flatten;
1427
1428 let res = [||];
1429
1430 source((. signal) =>
1431 switch (signal) {
1432 | Start(x) => talkback := x
1433 | _ => ignore(Js.Array.push(signal, res))
1434 }
1435 );
1436
1437 talkback^(. Pull);
1438 talkback^(. Pull);
1439 talkback^(. Pull);
1440 talkback^(. Pull);
1441 talkback^(. Pull);
1442 expect(res)
1443 |> toEqual([|Push(1), Push(2), Push(1), Push(2), End|]);
1444 })
1445 )
1446 );
1447
1448 describe("switchMap", () => {
1449 afterEach(() => Jest.useRealTimers());
1450 open Expect;
1451 open! Expect.Operators;
1452
1453 it("maps from a source and switches to a new source", () => {
1454 let a = Wonka.fromList([1, 2, 3]);
1455
1456 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1457 let signals = [||];
1458 let source = Wonka.switchMap((. x) => Wonka.fromList([x * x]), a);
1459
1460 source((. signal) =>
1461 switch (signal) {
1462 | Start(x) =>
1463 talkback := x;
1464 x(. Pull);
1465 | Push(_) =>
1466 ignore(Js.Array.push(signal, signals));
1467 talkback^(. Pull);
1468 | End => ignore(Js.Array.push(signal, signals))
1469 }
1470 );
1471
1472 expect(signals) == [|Push(1), Push(4), Push(9), End|];
1473 });
1474
1475 it("unsubscribes from previous subscriptions", () => {
1476 Jest.useFakeTimers();
1477
1478 let a = Wonka.interval(100);
1479
1480 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1481 let signals = [||];
1482 let source =
1483 Wonka.switchMap((. _) => Wonka.interval(25), a) |> Wonka.take(5);
1484
1485 source((. signal) =>
1486 switch (signal) {
1487 | Start(x) =>
1488 talkback := x;
1489 x(. Pull);
1490 | Push(_) =>
1491 ignore(Js.Array.push(signal, signals));
1492 talkback^(. Pull);
1493 | End => ignore(Js.Array.push(signal, signals))
1494 }
1495 );
1496
1497 Jest.runTimersToTime(300);
1498
1499 expect(signals)
1500 == [|Push(0), Push(1), Push(2), Push(0), Push(1), End|];
1501 });
1502
1503 testPromise("follows the spec for listenables", () =>
1504 Wonka_thelpers.testWithListenable(source =>
1505 Wonka.switchMap((. x) => x, Wonka.fromList([source]))
1506 )
1507 |> Js.Promise.then_(x =>
1508 expect(x)
1509 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
1510 |> Js.Promise.resolve
1511 )
1512 );
1513
1514 testPromise(
1515 "ends itself and source when its talkback receives the End signal", () =>
1516 Wonka_thelpers.testTalkbackEnd(source =>
1517 Wonka.switchMap((. x) => x, Wonka.fromList([source]))
1518 )
1519 |> Js.Promise.then_(x =>
1520 expect(x)
1521 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
1522 |> Js.Promise.resolve
1523 )
1524 );
1525 });
1526
1527 describe("buffer", () => {
1528 open Expect;
1529 open! Expect.Operators;
1530
1531 beforeEach(() => Jest.useFakeTimers());
1532 afterEach(() => Jest.useRealTimers());
1533
1534 it("should buffer values and emit them on each notifier tick", () => {
1535 let a = Wonka.interval(50);
1536
1537 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1538 let signals = [||];
1539
1540 let source = Wonka.buffer(Wonka.interval(100), a) |> Wonka.take(2);
1541
1542 source((. signal) =>
1543 switch (signal) {
1544 | Start(x) =>
1545 talkback := x;
1546 x(. Pull);
1547 | Push(_) =>
1548 ignore(Js.Array.push(signal, signals));
1549 talkback^(. Pull);
1550 | End => ignore(Js.Array.push(signal, signals))
1551 }
1552 );
1553
1554 Jest.runTimersToTime(400);
1555
1556 expect(signals) == [|Push([|0, 1|]), Push([|2, 3|]), End|];
1557 });
1558
1559 it("should end when the notifier ends", () => {
1560 let a = Wonka.interval(50) |> Wonka.take(3);
1561
1562 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1563 let signals = [||];
1564
1565 let source = Wonka.buffer(Wonka.interval(100), a);
1566
1567 source((. signal) =>
1568 switch (signal) {
1569 | Start(x) =>
1570 talkback := x;
1571 x(. Pull);
1572 | Push(_) =>
1573 ignore(Js.Array.push(signal, signals));
1574 talkback^(. Pull);
1575 | End => ignore(Js.Array.push(signal, signals))
1576 }
1577 );
1578
1579 Jest.runTimersToTime(400);
1580
1581 expect(signals) == [|Push([|0, 1|]), Push([|2|]), End|];
1582 });
1583 });
1584});
1585
1586describe("sink factories", () => {
1587 describe("forEach", () =>
1588 Expect.(
1589 it("calls a function for each emission of the passed source", () => {
1590 let i = ref(0);
1591 let nums = [||];
1592
1593 let source = sink => {
1594 sink(.
1595 Start(
1596 (. signal) =>
1597 switch (signal) {
1598 | Pull when i^ < 4 =>
1599 let num = i^;
1600 i := i^ + 1;
1601 sink(. Push(num));
1602 | Pull => sink(. End)
1603 | _ => ()
1604 },
1605 ),
1606 );
1607 };
1608
1609 Wonka.forEach((. x) => ignore(Js.Array.push(x, nums)), source);
1610 expect(nums) |> toEqual([|0, 1, 2, 3|]);
1611 })
1612 )
1613 );
1614
1615 describe("subscribe", () =>
1616 Expect.(
1617 it(
1618 "calls a function for each emission of the passed source and stops when unsubscribed",
1619 () => {
1620 let i = ref(0);
1621 let nums = [||];
1622 let push = ref(() => ());
1623
1624 let source = sink => {
1625 push :=
1626 (
1627 () => {
1628 let num = i^;
1629 i := i^ + 1;
1630 sink(. Push(num));
1631 }
1632 );
1633
1634 sink(. Start(Wonka_helpers.talkbackPlaceholder));
1635 };
1636
1637 let {unsubscribe} =
1638 Wonka.subscribe(
1639 (. x) => ignore(Js.Array.push(x, nums)),
1640 source,
1641 );
1642
1643 push^();
1644 push^();
1645 unsubscribe();
1646 push^();
1647 push^();
1648
1649 expect(nums) |> toEqual([|0, 1|]);
1650 },
1651 )
1652 )
1653 );
1654
1655 describe("toObservable", () =>
1656 Expect.(
1657 testPromise("should convert a source to an Observable", () => {
1658 let source = Wonka.fromArray([|1, 2|]);
1659 let observable =
1660 Wonka.toObservable(source) |> Wonka_thelpers.observableFrom;
1661 let values = [||];
1662
1663 let promise =
1664 observable->Wonka_thelpers.observableForEach(value =>
1665 ignore(Js.Array.push(value, values))
1666 );
1667
1668 promise
1669 |> Js.Promise.then_(() =>
1670 expect(values) |> toEqual([|1, 2|]) |> Js.Promise.resolve
1671 );
1672 })
1673 )
1674 );
1675
1676 describe("toCallbag", () =>
1677 Expect.(
1678 it("should convert a source to a Callbag", () => {
1679 let source = Wonka.fromArray([|1, 2|]);
1680 let callbag = Wonka.toCallbag(source);
1681 let values = [||];
1682
1683 (
1684 Wonka_thelpers.callbagIterate(. value =>
1685 ignore(Js.Array.push(value, values))
1686 )
1687 )(.
1688 callbag,
1689 );
1690
1691 expect(values) |> toEqual([|1, 2|]);
1692 })
1693 )
1694 );
1695
1696 describe("toArray", () => {
1697 open Expect;
1698
1699 it("converts iterable sources to arrays", () => {
1700 let input = [|1, 2, 3|];
1701 let output = Wonka.fromArray(input) |> Wonka.toArray;
1702 expect(output) |> toEqual(input);
1703 });
1704
1705 it("converts mapped iterable sources to arrays", () => {
1706 let input = [|1, 2, 3|];
1707 let output =
1708 Wonka.fromArray(input) |> Wonka.map((. x) => x) |> Wonka.toArray;
1709 expect(output) |> toEqual(input);
1710 });
1711
1712 it("converts concatenated iterable sources to arrays", () => {
1713 let inputA = [|1, 2, 3|];
1714 let inputB = [|1, 2, 3|];
1715 let input = Array.append(inputA, inputB);
1716
1717 let output =
1718 Wonka.concat([|Wonka.fromArray(inputA), Wonka.fromArray(inputB)|])
1719 |> Wonka.toArray;
1720
1721 expect(output) |> toEqual(input);
1722 });
1723
1724 it("ignores and closes asynchronous push streams", () => {
1725 let input = [|1, 2, 3|];
1726 let source = Wonka.concat([|Wonka.fromArray(input), Wonka.never|]);
1727 let (signals, wrappedSource) =
1728 source |> Wonka_thelpers.testSourceOperator;
1729
1730 ignore(Wonka.toArray(wrappedSource));
1731 expect(signals) |> toEqual([|Pull, Close|]);
1732 });
1733 });
1734});
1735
1736describe("chains (integration)", () =>
1737 Expect.(
1738 it("fromArray, map, forEach", () => {
1739 let input = Array.mapi((i, _) => i, Array.make(1000, 1));
1740 let output = Array.map(x => string_of_int(x));
1741 let actual = [||];
1742
1743 input
1744 |> Wonka.fromArray
1745 |> Wonka.map((. x) => string_of_int(x))
1746 |> Wonka.forEach((. x) => ignore(Js.Array.push(x, actual)));
1747
1748 expect(output) |> toEqual(output);
1749 })
1750 )
1751);
1752
1753describe("subject", () => {
1754 open Expect;
1755 open! Expect.Operators;
1756
1757 it("sends values passed to .next to puller sinks", () => {
1758 let signals = [||];
1759
1760 let subject = Wonka.makeSubject();
1761
1762 subject.source((. signal) =>
1763 switch (signal) {
1764 | Start(_) => ignore()
1765 | Push(_) => ignore(Js.Array.push(signal, signals))
1766 | End => ignore(Js.Array.push(signal, signals))
1767 }
1768 );
1769
1770 subject.next(10);
1771 subject.next(20);
1772 subject.next(30);
1773 subject.next(40);
1774 subject.complete();
1775
1776 expect(signals) == [|Push(10), Push(20), Push(30), Push(40), End|];
1777 });
1778
1779 it("handles multiple sinks", () => {
1780 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1781 let signalsOne = [||];
1782 let signalsTwo = [||];
1783
1784 let subject = Wonka.makeSubject();
1785
1786 subject.source((. signal) =>
1787 switch (signal) {
1788 | Start(x) => talkback := x
1789 | Push(_) => ignore(Js.Array.push(signal, signalsOne))
1790 | End => ignore(Js.Array.push(signal, signalsOne))
1791 }
1792 );
1793
1794 subject.source((. signal) =>
1795 switch (signal) {
1796 | Start(_) => ignore()
1797 | Push(_) => ignore(Js.Array.push(signal, signalsTwo))
1798 | End => ignore(Js.Array.push(signal, signalsTwo))
1799 }
1800 );
1801
1802 subject.next(10);
1803 subject.next(20);
1804 subject.next(30);
1805
1806 talkback^(. Close);
1807
1808 subject.next(40);
1809 subject.next(50);
1810
1811 subject.complete();
1812
1813 expect((signalsOne, signalsTwo))
1814 == (
1815 [|Push(10), Push(20), Push(30)|],
1816 [|Push(10), Push(20), Push(30), Push(40), Push(50), End|],
1817 );
1818 });
1819
1820 it("handles multiple sinks that subscribe and close at different times", () => {
1821 let talkbackOne = ref((. _: Wonka_types.talkbackT) => ());
1822 let talkbackTwo = ref((. _: Wonka_types.talkbackT) => ());
1823 let signalsOne = [||];
1824 let signalsTwo = [||];
1825
1826 let subject = Wonka.makeSubject();
1827
1828 subject.next(10);
1829 subject.next(20);
1830
1831 subject.source((. signal) =>
1832 switch (signal) {
1833 | Start(x) => talkbackOne := x
1834 | Push(_) => ignore(Js.Array.push(signal, signalsOne))
1835 | End => ignore(Js.Array.push(signal, signalsOne))
1836 }
1837 );
1838
1839 subject.next(30);
1840
1841 subject.source((. signal) =>
1842 switch (signal) {
1843 | Start(x) => talkbackTwo := x
1844 | Push(_) => ignore(Js.Array.push(signal, signalsTwo))
1845 | End => ignore(Js.Array.push(signal, signalsTwo))
1846 }
1847 );
1848
1849 subject.next(40);
1850 subject.next(50);
1851
1852 talkbackTwo^(. Close);
1853
1854 subject.next(60);
1855
1856 talkbackOne^(. Close);
1857
1858 subject.next(70);
1859 subject.complete();
1860
1861 expect((signalsOne, signalsTwo))
1862 == (
1863 [|Push(30), Push(40), Push(50), Push(60)|],
1864 [|Push(40), Push(50)|],
1865 );
1866 });
1867});
1868
1869describe("web operators", () => {
1870 describe("delay", () => {
1871 open Expect;
1872 open! Expect.Operators;
1873
1874 afterEach(() => Jest.useRealTimers());
1875
1876 it("should not emit values before specified delay", () => {
1877 Jest.useFakeTimers();
1878 let a = Wonka.fromList([1, 2, 3]);
1879
1880 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1881 let signals = [||];
1882
1883 let source = WonkaJs.delay(200, a) |> Wonka.take(3);
1884
1885 source((. signal) =>
1886 switch (signal) {
1887 | Start(x) =>
1888 talkback := x;
1889 x(. Pull);
1890 | Push(_) =>
1891 ignore(Js.Array.push(signal, signals));
1892 talkback^(. Pull);
1893 | End => ignore(Js.Array.push(signal, signals))
1894 }
1895 );
1896
1897 expect(signals) == [||];
1898 });
1899
1900 it("should emit values after specified delay", () => {
1901 Jest.useFakeTimers();
1902 let a = Wonka.fromList([1, 2, 3]);
1903
1904 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1905 let signals = [||];
1906
1907 let source = WonkaJs.delay(200, a) |> Wonka.take(3);
1908
1909 source((. signal) =>
1910 switch (signal) {
1911 | Start(x) =>
1912 talkback := x;
1913 x(. Pull);
1914 | Push(_) =>
1915 ignore(Js.Array.push(signal, signals));
1916 talkback^(. Pull);
1917 | End => ignore(Js.Array.push(signal, signals))
1918 }
1919 );
1920
1921 Jest.runTimersToTime(400);
1922
1923 expect(signals) == [|Push(1), Push(2)|];
1924 });
1925
1926 it("should emit an End signal when the source has emitted all values", () => {
1927 Jest.useFakeTimers();
1928 let a = Wonka.fromList([1, 2, 3]);
1929
1930 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1931 let signals = [||];
1932
1933 let source = WonkaJs.delay(200, a) |> Wonka.take(3);
1934
1935 source((. signal) =>
1936 switch (signal) {
1937 | Start(x) =>
1938 talkback := x;
1939 x(. Pull);
1940 | Push(_) =>
1941 ignore(Js.Array.push(signal, signals));
1942 talkback^(. Pull);
1943 | End => ignore(Js.Array.push(signal, signals))
1944 }
1945 );
1946
1947 Jest.runTimersToTime(600);
1948
1949 expect(signals) == [|Push(1), Push(2), Push(3), End|];
1950 });
1951 });
1952
1953 describe("throttle", () => {
1954 open Expect;
1955 open! Expect.Operators;
1956
1957 afterEach(() => Jest.useRealTimers());
1958
1959 it(
1960 "should not emit values before specified throttle (but include values on leading edge)",
1961 () => {
1962 Jest.useFakeTimers();
1963 let a = Wonka.interval(100);
1964
1965 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1966 let signals = [||];
1967
1968 let source = WonkaJs.throttle((. _) => 600, a) |> Wonka.take(3);
1969
1970 source((. signal) =>
1971 switch (signal) {
1972 | Start(x) =>
1973 talkback := x;
1974 x(. Pull);
1975 | Push(_) =>
1976 ignore(Js.Array.push(signal, signals));
1977 talkback^(. Pull);
1978 | End => ignore(Js.Array.push(signal, signals))
1979 }
1980 );
1981
1982 Jest.runTimersToTime(400);
1983
1984 expect(signals) == [|Push(0)|];
1985 },
1986 );
1987
1988 it("should throttle emissions by the specified throttle", () => {
1989 Jest.useFakeTimers();
1990 let a = Wonka.interval(100);
1991
1992 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1993 let signals = [||];
1994
1995 let source = WonkaJs.throttle((. _) => 600, a) |> Wonka.take(3);
1996
1997 source((. signal) =>
1998 switch (signal) {
1999 | Start(x) =>
2000 talkback := x;
2001 x(. Pull);
2002 | Push(_) =>
2003 ignore(Js.Array.push(signal, signals));
2004 talkback^(. Pull);
2005 | End => ignore(Js.Array.push(signal, signals))
2006 }
2007 );
2008
2009 Jest.runTimersToTime(1000);
2010
2011 expect(signals) == [|Push(0), Push(7)|];
2012 });
2013
2014 it("should emit an End signal when the source has emitted all values", () => {
2015 Jest.useFakeTimers();
2016 let a = Wonka.interval(100);
2017
2018 let talkback = ref((. _: Wonka_types.talkbackT) => ());
2019 let signals = [||];
2020
2021 let source = WonkaJs.throttle((. _) => 600, a) |> Wonka.take(3);
2022
2023 source((. signal) =>
2024 switch (signal) {
2025 | Start(x) =>
2026 talkback := x;
2027 x(. Pull);
2028 | Push(_) =>
2029 ignore(Js.Array.push(signal, signals));
2030 talkback^(. Pull);
2031 | End => ignore(Js.Array.push(signal, signals))
2032 }
2033 );
2034
2035 Jest.runTimersToTime(1500);
2036
2037 expect(signals) == [|Push(0), Push(7), Push(14), End|];
2038 });
2039 });
2040
2041 describe("debounce", () => {
2042 open Expect;
2043 open! Expect.Operators;
2044
2045 afterEach(() => Jest.useRealTimers());
2046
2047 it(
2048 "should not emit values if emitted before the debounce specified by the duration selector",
2049 () => {
2050 Jest.useFakeTimers();
2051 let a = Wonka.fromList([1, 2, 3]);
2052
2053 let talkback = ref((. _: Wonka_types.talkbackT) => ());
2054 let signals = [||];
2055
2056 let source = WonkaJs.debounce((. _) => 1000, a) |> Wonka.take(3);
2057
2058 source((. signal) =>
2059 switch (signal) {
2060 | Start(x) =>
2061 talkback := x;
2062 x(. Pull);
2063 | Push(_) =>
2064 ignore(Js.Array.push(signal, signals));
2065 talkback^(. Pull);
2066 | End => ignore(Js.Array.push(signal, signals))
2067 }
2068 );
2069
2070 Jest.runTimersToTime(500);
2071
2072 expect(signals) == [||];
2073 },
2074 );
2075
2076 it("should debounce emissions based on the duration selector", () => {
2077 Jest.useFakeTimers();
2078 let a = Wonka.fromList([1, 2, 3]);
2079
2080 let talkback = ref((. _: Wonka_types.talkbackT) => ());
2081 let signals = [||];
2082
2083 let source = WonkaJs.debounce((. _) => 1000, a) |> Wonka.take(3);
2084
2085 source((. signal) =>
2086 switch (signal) {
2087 | Start(x) =>
2088 talkback := x;
2089 x(. Pull);
2090 | Push(_) =>
2091 ignore(Js.Array.push(signal, signals));
2092 talkback^(. Pull);
2093 | End => ignore(Js.Array.push(signal, signals))
2094 }
2095 );
2096
2097 Jest.runTimersToTime(2000);
2098
2099 expect(signals) == [|Push(1), Push(2)|];
2100 });
2101
2102 it("should emit an End signal when the source has emitted all values", () => {
2103 Jest.useFakeTimers();
2104 let a = Wonka.fromList([1, 2, 3]);
2105
2106 let talkback = ref((. _: Wonka_types.talkbackT) => ());
2107 let signals = [||];
2108
2109 let source = WonkaJs.debounce((. _) => 1000, a) |> Wonka.take(3);
2110
2111 source((. signal) =>
2112 switch (signal) {
2113 | Start(x) =>
2114 talkback := x;
2115 x(. Pull);
2116 | Push(_) =>
2117 ignore(Js.Array.push(signal, signals));
2118 talkback^(. Pull);
2119 | End => ignore(Js.Array.push(signal, signals))
2120 }
2121 );
2122
2123 Jest.runTimersToTime(3000);
2124
2125 expect(signals) == [|Push(1), Push(2), Push(3), End|];
2126 });
2127 });
2128
2129 describe("toPromise", () => {
2130 open Expect;
2131 open! Expect.Operators;
2132
2133 testPromise("should convert a source to a Promise", () => {
2134 let a = Wonka.fromValue(1);
2135
2136 a
2137 |> WonkaJs.toPromise
2138 |> Js.Promise.then_(x => expect(x) |> toEqual(1) |> Js.Promise.resolve);
2139 });
2140
2141 testPromise("should resolve only the last emitted value from a source", () => {
2142 let a = Wonka.fromList([1, 2, 3]);
2143
2144 a
2145 |> WonkaJs.toPromise
2146 |> Js.Promise.then_(x => expect(x) |> toEqual(3) |> Js.Promise.resolve);
2147 });
2148 });
2149});