1open Jest;
2open Wonka_types;
3
4let it = test;
5
6describe("source factories", () => {
7 describe("fromList", () => {
8 open Expect;
9 open! Expect.Operators;
10
11 it("sends list items to a puller sink", () => {
12 let source = Wonka.fromList([10, 20, 30]);
13 let talkback = ref((._: Wonka_types.talkbackT) => ());
14 let signals = [||];
15
16 source((.signal) => {
17 switch (signal) {
18 | Start(x) => talkback := x;
19 | Push(_) => ignore(Js.Array.push(signal, signals));
20 | End => ignore(Js.Array.push(signal, signals))
21 };
22 });
23
24 talkback^(.Pull);
25 talkback^(.Pull);
26 talkback^(.Pull);
27 talkback^(.Pull);
28
29 expect(signals) == [| Push(10), Push(20), Push(30), End |];
30 });
31 });
32
33 describe("fromArray", () => {
34 open Expect;
35 open! Expect.Operators;
36
37 it("sends array items to a puller sink", () => {
38 let source = Wonka.fromArray([| 10, 20, 30 |]);
39 let talkback = ref((._: Wonka_types.talkbackT) => ());
40 let signals = ref([||]);
41
42 source((.signal) => {
43 switch (signal) {
44 | Start(x) => {
45 talkback := x;
46 x(.Pull);
47 }
48 | Push(_) => {
49 signals := Array.append(signals^, [|signal|]);
50 talkback^(.Pull);
51 }
52 | End => signals := Array.append(signals^, [|signal|]);
53 };
54 });
55
56 expect(signals^) == [| Push(10), Push(20), Push(30), End |];
57 });
58
59 it("does not blow up the stack when iterating something huge", () => {
60 let arr = Array.make(100000, 123);
61 let source = Wonka.fromArray(arr);
62 let talkback = ref((._: Wonka_types.talkbackT) => ());
63 let values = [||];
64
65 source((.signal) => {
66 switch (signal) {
67 | Start(x) => {
68 talkback := x;
69 x(.Pull);
70 }
71 | Push(x) => {
72 ignore(Js.Array.push(x, values));
73 talkback^(.Pull);
74 }
75 | End => ()
76 };
77 });
78
79 expect(Array.length(values)) == Array.length(arr);
80 });
81 });
82
83 describe("fromValue", () => {
84 open Expect;
85 open! Expect.Operators;
86
87 it("sends a single item to a puller sink", () => {
88 let source = Wonka.fromValue(123);
89 let talkback = ref((._: Wonka_types.talkbackT) => ());
90 let signals = [||];
91
92 source((.signal) => {
93 switch (signal) {
94 | Start(x) => talkback := x;
95 | Push(_) => ignore(Js.Array.push(signal, signals));
96 | End => ignore(Js.Array.push(signal, signals))
97 };
98 });
99
100 talkback^(.Pull);
101 talkback^(.Pull);
102 talkback^(.Pull); /* one extra to check whether no signal comes back after it has ended */
103
104 expect(signals) == [| Push(123), End |];
105 });
106 });
107
108 describe("empty", () => {
109 open Expect;
110 open! Expect.Operators;
111
112 it("ends immediately", () => {
113 let talkback = ref((._: Wonka_types.talkbackT) => ());
114 let signals = [||];
115
116 Wonka.empty((.signal) => {
117 switch (signal) {
118 | Start(x) => talkback := x;
119 | _ => ignore(Js.Array.push(signal, signals))
120 };
121 });
122
123 let _signals = Array.copy(signals);
124
125 talkback^(.Pull);
126 talkback^(.Pull);
127
128 expect((_signals, signals)) == ([| End |], [| End |]);
129 });
130 });
131
132 describe("never", () => {
133 open Expect;
134 open! Expect.Operators;
135
136 it("does not end", () => {
137 let talkback = ref((._: Wonka_types.talkbackT) => ());
138 let ended = ref(false);
139
140 Wonka.never((.signal) => {
141 switch (signal) {
142 | Start(x) => talkback := x;
143 | End => ended := true
144 | _ => ()
145 };
146 });
147
148 talkback^(.Pull);
149 talkback^(.Pull);
150
151 expect(ended^) === false;
152 });
153 });
154});
155
156describe("operator factories", () => {
157 describe("map", () => {
158 open Expect;
159
160 it("maps all emissions of a source", () => {
161 let num = ref(1);
162 let nums = [||];
163 let talkback = ref((._: Wonka_types.talkbackT) => ());
164
165 Wonka.map((._) => {
166 let res = num^;
167 num := num^ + 1;
168 res
169 }, sink => {
170 sink(.Start((.signal) => {
171 switch (signal) {
172 | Pull => sink(.Push(1));
173 | _ => ()
174 }
175 }));
176 }, (.signal) => {
177 switch (signal) {
178 | Start(x) => {
179 talkback := x;
180 x(.Pull);
181 }
182 | Push(x) when num^ < 6 => {
183 ignore(Js.Array.push(x, nums));
184 talkback^(.Pull);
185 }
186 | _ => ()
187 }
188 });
189
190 expect(nums) |> toEqual([|1, 2, 3, 4|])
191 });
192
193 testPromise("follows the spec for listenables", () => {
194 Wonka_thelpers.testWithListenable(Wonka.map((.x) => x))
195 |> Js.Promise.then_(x => {
196 expect(x)
197 |> toEqual(([||], [| Push(1), Push(2), End |]))
198 |> Js.Promise.resolve
199 })
200 });
201
202 testPromise("ends itself and source when its talkback receives the End signal", () => {
203 let end_: talkbackT = Close;
204
205 Wonka_thelpers.testTalkbackEnd(Wonka.map((.x) => x))
206 |> Js.Promise.then_(x => {
207 expect(x)
208 |> toEqual(([| end_ |], [| Push(1) |]))
209 |> Js.Promise.resolve
210 })
211 });
212 });
213
214 describe("filter", () => {
215 open Expect;
216
217 it("filters emissions according to a predicate", () => {
218 let i = ref(1);
219 let nums = [||];
220 let talkback = ref((._: Wonka_types.talkbackT) => ());
221
222 Wonka.filter((.x) => x mod 2 === 0, sink => {
223 sink(.Start((.signal) => {
224 switch (signal) {
225 | Pull => {
226 let num = i^;
227 i := i^ + 1;
228 sink(.Push(num));
229 }
230 | _ => ()
231 }
232 }));
233 }, (.signal) => {
234 switch (signal) {
235 | Start(x) => {
236 talkback := x;
237 x(.Pull);
238 }
239 | Push(x) when x < 6 => {
240 ignore(Js.Array.push(x, nums));
241 talkback^(.Pull);
242 }
243 | _ => ()
244 }
245 });
246
247 expect(nums) |> toEqual([|2, 4|])
248 });
249
250 testPromise("follows the spec for listenables", () => {
251 Wonka_thelpers.testWithListenable(Wonka.filter((._) => true))
252 |> Js.Promise.then_(x => {
253 expect(x)
254 |> toEqual(([||], [| Push(1), Push(2), End |]))
255 |> Js.Promise.resolve
256 })
257 });
258
259 testPromise("follows the spec for listenables when filtering", () => {
260 Wonka_thelpers.testWithListenable(Wonka.filter((._) => false))
261 |> Js.Promise.then_(x => {
262 expect(x)
263 |> toEqual(([| Pull, Pull |], [| End |]))
264 |> Js.Promise.resolve
265 })
266 });
267
268 testPromise("ends itself and source when its talkback receives the End signal", () => {
269 let end_: talkbackT = Close;
270
271 Wonka_thelpers.testTalkbackEnd(Wonka.filter((._) => true))
272 |> Js.Promise.then_(x => {
273 expect(x)
274 |> toEqual(([| end_ |], [| Push(1) |]))
275 |> Js.Promise.resolve
276 })
277 });
278 });
279
280 describe("scan", () => {
281 open Expect;
282
283 it("folds emissions using an initial seed value", () => {
284 let talkback = ref((._: Wonka_types.talkbackT) => ());
285 let num = ref(1);
286
287 let source = Wonka.scan((.acc, x) => acc + x, 0, sink => sink(.Start((.signal) => {
288 switch (signal) {
289 | Pull => {
290 let i = num^;
291 if (i <= 3) {
292 num := num^ + 1;
293 sink(.Push(i));
294 } else {
295 sink(.End);
296 }
297 }
298 | _ => ()
299 }
300 })));
301
302 let res = [||];
303
304 source((.signal) => {
305 switch (signal) {
306 | Start(x) => talkback := x
307 | _ => ignore(Js.Array.push(signal, res))
308 }
309 });
310
311 talkback^(.Pull);
312 talkback^(.Pull);
313 talkback^(.Pull);
314 talkback^(.Pull);
315 expect(res) |> toEqual([| Push(1), Push(3), Push(6), End |]);
316 });
317
318 testPromise("follows the spec for listenables", () => {
319 Wonka_thelpers.testWithListenable(Wonka.scan((._, x) => x, 0))
320 |> Js.Promise.then_(x => {
321 expect(x)
322 |> toEqual(([||], [| Push(1), Push(2), End |]))
323 |> Js.Promise.resolve
324 })
325 });
326
327 testPromise("ends itself and source when its talkback receives the End signal", () => {
328 let end_: talkbackT = Close;
329
330 Wonka_thelpers.testTalkbackEnd(Wonka.scan((._, x) => x, 0))
331 |> Js.Promise.then_(x => {
332 expect(x)
333 |> toEqual(([| end_ |], [| Push(1) |]))
334 |> Js.Promise.resolve
335 })
336 });
337 });
338
339 describe("merge", () => {
340 open Expect;
341 open! Expect.Operators;
342
343 it("merges different sources into a single one", () => {
344 let a = Wonka.fromList([1, 2, 3]);
345 let b = Wonka.fromList([4, 5, 6]);
346 let talkback = ref((._: Wonka_types.talkbackT) => ());
347 let signals = [||];
348 let source = Wonka.merge([| a, b |]);
349
350 source((.signal) => {
351 switch (signal) {
352 | Start(x) => {
353 talkback := x;
354 x(.Pull);
355 }
356 | Push(_) => {
357 ignore(Js.Array.push(signal, signals));
358 talkback^(.Pull);
359 }
360 | End => ignore(Js.Array.push(signal, signals))
361 };
362 });
363
364 expect(signals) == [| Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End |];
365 });
366
367 testPromise("follows the spec for listenables", () => {
368 Wonka_thelpers.testWithListenable(source => Wonka.merge([|source|]))
369 |> Js.Promise.then_(x => {
370 expect(x)
371 |> toEqual(([| Pull, Pull, Pull |], [| Push(1), Push(2), End |]))
372 |> Js.Promise.resolve
373 })
374 });
375
376 testPromise("ends itself and source when its talkback receives the End signal", () => {
377 Wonka_thelpers.testTalkbackEnd(source => Wonka.merge([|source|]))
378 |> Js.Promise.then_(x => {
379 expect(x)
380 |> toEqual(([| Pull, Pull, Close |], [| Push(1) |]))
381 |> Js.Promise.resolve
382 })
383 });
384 });
385
386 describe("concat", () => {
387 open Expect;
388 open! Expect.Operators;
389
390 it("concatenates different sources into a single one", () => {
391 let a = Wonka.fromList([1, 2, 3]);
392 let b = Wonka.fromList([4, 5, 6]);
393 let talkback = ref((._: Wonka_types.talkbackT) => ());
394 let signals = [||];
395 let source = Wonka.concat([| a, b |]);
396
397 source((.signal) => {
398 switch (signal) {
399 | Start(x) => {
400 talkback := x;
401 x(.Pull);
402 }
403 | Push(_) => {
404 ignore(Js.Array.push(signal, signals));
405 talkback^(.Pull);
406 }
407 | End => ignore(Js.Array.push(signal, signals))
408 };
409 });
410
411 expect(signals) == [| Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End |];
412 });
413
414 testPromise("follows the spec for listenables", () => {
415 Wonka_thelpers.testWithListenable(source => Wonka.concat([|source|]))
416 |> Js.Promise.then_(x => {
417 expect(x)
418 |> toEqual(([| Pull, Pull, Pull |], [| Push(1), Push(2), End |]))
419 |> Js.Promise.resolve
420 })
421 });
422
423 testPromise("ends itself and source when its talkback receives the End signal", () => {
424 Wonka_thelpers.testTalkbackEnd(source => Wonka.concat([|source|]))
425 |> Js.Promise.then_(x => {
426 expect(x)
427 |> toEqual(([| Pull, Pull, Close |], [| Push(1) |]))
428 |> Js.Promise.resolve
429 })
430 });
431 });
432
433 describe("share", () => {
434 open Expect;
435
436 it("shares an underlying source with all sinks", () => {
437 let talkback = ref((._: Wonka_types.talkbackT) => ());
438 let aborterTb = ref((._: Wonka_types.talkbackT) => ());
439 let num = ref(1);
440 let nums = [||];
441
442 let source = Wonka.share(sink => {
443 sink(.Start((.signal) => {
444 switch (signal) {
445 | Pull => {
446 let i = num^;
447 if (i <= 2) {
448 num := num^ + 1;
449 sink(.Push(i));
450 } else {
451 sink(.End);
452 }
453 }
454 | _ => ()
455 }
456 }));
457 });
458
459 source((.signal) => {
460 switch (signal) {
461 | Start(x) => talkback := x
462 | _ => ignore(Js.Array.push(signal, nums))
463 }
464 });
465
466 source((.signal) => {
467 switch (signal) {
468 | Start(_) => ()
469 | _ => ignore(Js.Array.push(signal, nums))
470 }
471 });
472
473 source((.signal) => {
474 switch (signal) {
475 | Start(tb) => aborterTb := tb
476 | _ => {
477 ignore(Js.Array.push(signal, nums));
478 aborterTb^(.Close);
479 }
480 }
481 });
482
483 talkback^(.Pull);
484 let numsA = Array.copy(nums);
485 talkback^(.Pull);
486 talkback^(.Pull);
487 talkback^(.Pull);
488 expect((numsA, nums)) |> toEqual(([| Push(1), Push(1), Push(1) |], [| Push(1), Push(1), Push(1), Push(2), Push(2), End, End |]));
489 });
490
491 testPromise("follows the spec for listenables", () => {
492 Wonka_thelpers.testWithListenable(Wonka.share)
493 |> Js.Promise.then_(x => {
494 expect(x)
495 |> toEqual(([||], [| Push(1), Push(2), End |]))
496 |> Js.Promise.resolve
497 })
498 });
499
500 testPromise("ends itself and source when its talkback receives the End signal", () => {
501 let end_: talkbackT = Close;
502
503 Wonka_thelpers.testTalkbackEnd(Wonka.share)
504 |> Js.Promise.then_(x => {
505 expect(x)
506 |> toEqual(([| end_ |], [| Push(1) |]))
507 |> Js.Promise.resolve
508 })
509 });
510 });
511
512 describe("combine", () => {
513 open Expect;
514
515 it("combines the latest values of two sources", () => {
516 let talkback = ref((._: Wonka_types.talkbackT) => ());
517
518 let makeSource = (factor: int) => {
519 let num = ref(1);
520
521 sink => {
522 sink(.Start((.signal) => {
523 switch (signal) {
524 | Pull => {
525 if (num^ <= 2) {
526 let i = num^ * factor;
527 num := num^ + 1;
528 sink(.Push(i));
529 } else {
530 sink(.End);
531 }
532 }
533 | _ => ()
534 }
535 }));
536 }
537 };
538
539 let sourceA = makeSource(1);
540 let sourceB = makeSource(2);
541 let source = Wonka.combine(sourceA, sourceB);
542 let res = [||];
543
544 source((.signal) => {
545 switch (signal) {
546 | Start(x) => talkback := x
547 | _ => ignore(Js.Array.push(signal, res))
548 }
549 });
550
551 talkback^(.Pull);
552 talkback^(.Pull);
553 talkback^(.Pull);
554 talkback^(.Pull);
555 expect(res) |> toEqual([| Push((1, 2)), Push((2, 2)), Push((2, 4)), End |]);
556 });
557
558 testPromise("follows the spec for listenables", () => {
559 Wonka_thelpers.testWithListenable(source => {
560 let shared = Wonka.share(source);
561 Wonka.combine(shared, shared)
562 })
563 |> Js.Promise.then_(x => {
564 expect(x)
565 |> toEqual(([||], [| Push((1, 1)), Push((2, 1)), Push((2, 2)), End |]))
566 |> Js.Promise.resolve
567 })
568 });
569
570 testPromise("ends itself and source when its talkback receives the End signal", () => {
571 let end_: talkbackT = Close;
572
573 Wonka_thelpers.testTalkbackEnd(source => {
574 let shared = Wonka.share(source);
575 Wonka.combine(shared, shared)
576 })
577 |> Js.Promise.then_(x => {
578 expect(x)
579 |> toEqual(([| end_ |], [| Push((1, 1)) |]))
580 |> Js.Promise.resolve
581 })
582 });
583 });
584
585 describe("take", () => {
586 open Expect;
587
588 it("only lets a maximum number of values through", () => {
589 let talkback = ref((._: Wonka_types.talkbackT) => ());
590 let num = ref(1);
591
592 let source = Wonka.take(2, sink => sink(.Start((.signal) => {
593 switch (signal) {
594 | Pull => {
595 let i = num^;
596 num := num^ + 1;
597 sink(.Push(i));
598 }
599 | _ => ()
600 }
601 })));
602
603 let res = [||];
604
605 source((.signal) => {
606 switch (signal) {
607 | Start(x) => talkback := x
608 | _ => ignore(Js.Array.push(signal, res))
609 }
610 });
611
612 talkback^(.Pull);
613 talkback^(.Pull);
614 talkback^(.Pull);
615 talkback^(.Pull);
616 expect(res) |> toEqual([| Push(1), Push(2), End |]);
617 });
618
619 it("accepts the end of the source when max number of emissions is not reached", () => {
620 let talkback = ref((._: Wonka_types.talkbackT) => ());
621 let num = ref(1);
622
623 let source = Wonka.take(2, sink => sink(.Start((.signal) => {
624 switch (signal) {
625 | Pull => {
626 let i = num^;
627 if (i < 2) {
628 num := num^ + 1;
629 sink(.Push(i));
630 } else {
631 sink(.End);
632 }
633 }
634 | _ => ()
635 }
636 })));
637
638 let res = [||];
639
640 source((.signal) => {
641 switch (signal) {
642 | Start(x) => talkback := x
643 | _ => ignore(Js.Array.push(signal, res))
644 }
645 });
646
647 talkback^(.Pull);
648 talkback^(.Pull);
649 talkback^(.Pull);
650 expect(res) |> toEqual([| Push(1), End |]);
651 });
652
653 testPromise("follows the spec for listenables", () => {
654 Wonka_thelpers.testWithListenable(Wonka.take(10))
655 |> Js.Promise.then_(x => {
656 expect(x)
657 |> toEqual(([||], [| Push(1), Push(2), End |]))
658 |> Js.Promise.resolve
659 })
660 });
661
662 testPromise("follows the spec for listenables when ending the source", () => {
663 let end_: talkbackT = Close;
664
665 Wonka_thelpers.testWithListenable(Wonka.take(1))
666 |> Js.Promise.then_(x => {
667 expect(x)
668 |> toEqual(([| end_ |], [| Push(1), End |]))
669 |> Js.Promise.resolve
670 })
671 });
672
673 testPromise("ends itself and source when its talkback receives the End signal", () => {
674 let end_: talkbackT = Close;
675
676 Wonka_thelpers.testTalkbackEnd(Wonka.take(10))
677 |> Js.Promise.then_(x => {
678 expect(x)
679 |> toEqual(([| end_ |], [| Push(1) |]))
680 |> Js.Promise.resolve
681 })
682 });
683 });
684
685 describe("takeLast", () => {
686 open Expect;
687
688 it("only lets the last n values through on an entirely new source", () => {
689 let talkback = ref((._: Wonka_types.talkbackT) => ());
690 let num = ref(1);
691
692 let source = Wonka.takeLast(2, sink => sink(.Start((.signal) => {
693 switch (signal) {
694 | Pull when num^ <= 4 => {
695 let i = num^;
696 num := num^ + 1;
697 sink(.Push(i));
698 }
699 | Pull => sink(.End)
700 | _ => ()
701 }
702 })));
703
704 let res = [||];
705
706 source((.signal) => {
707 switch (signal) {
708 | Start(x) => talkback := x
709 | _ => ignore(Js.Array.push(signal, res))
710 }
711 });
712
713 talkback^(.Pull);
714 talkback^(.Pull);
715 talkback^(.Pull);
716 expect(res) |> toEqual([| Push(3), Push(4), End |]);
717 });
718
719 testPromise("follows the spec for listenables", () => {
720 Wonka_thelpers.testWithListenable(Wonka.takeLast(10))
721 |> Js.Promise.then_(x => {
722 expect(x)
723 |> toEqual(([| Pull, Pull, Pull |], [| /* empty since the source is a pullable */ |]))
724 |> Js.Promise.resolve
725 })
726 });
727
728 testPromise("ends itself and source when its talkback receives the End signal", () => {
729 Wonka_thelpers.testTalkbackEnd(Wonka.takeLast(10))
730 |> Js.Promise.then_(x => {
731 expect(x)
732 |> toEqual(([| Pull, Pull |], [| |]))
733 |> Js.Promise.resolve
734 })
735 });
736 });
737
738 describe("takeWhile", () => {
739 open Expect;
740
741 it("only lets the last n values through on an entirely new source", () => {
742 let talkback = ref((._: Wonka_types.talkbackT) => ());
743 let num = ref(1);
744
745 let source = Wonka.takeWhile((.x) => x <= 2, sink => sink(.Start((.signal) => {
746 switch (signal) {
747 | Pull => {
748 let i = num^;
749 num := num^ + 1;
750 sink(.Push(i));
751 }
752 | _ => ()
753 }
754 })));
755
756 let res = [||];
757
758 source((.signal) => {
759 switch (signal) {
760 | Start(x) => talkback := x
761 | _ => ignore(Js.Array.push(signal, res))
762 }
763 });
764
765 talkback^(.Pull);
766 talkback^(.Pull);
767 talkback^(.Pull);
768 talkback^(.Pull);
769
770 expect(res) |> toEqual([| Push(1), Push(2), End |]);
771 });
772
773 it("accepts the end of the source when max number of emissions is not reached", () => {
774 let talkback = ref((._: Wonka_types.talkbackT) => ());
775 let num = ref(1);
776
777 let source = Wonka.takeWhile((.x) => x <= 5, sink => sink(.Start((.signal) => {
778 switch (signal) {
779 | Pull => {
780 let i = num^;
781 if (i < 2) {
782 num := num^ + 1;
783 sink(.Push(i));
784 } else {
785 sink(.End);
786 }
787 }
788 | _ => ()
789 }
790 })));
791
792 let res = [||];
793
794 source((.signal) => {
795 switch (signal) {
796 | Start(x) => talkback := x
797 | _ => ignore(Js.Array.push(signal, res))
798 }
799 });
800
801 talkback^(.Pull);
802 talkback^(.Pull);
803 talkback^(.Pull);
804
805 expect(res) |> toEqual([| Push(1), End |]);
806 });
807
808 testPromise("follows the spec for listenables", () => {
809 Wonka_thelpers.testWithListenable(Wonka.takeWhile((._) => true))
810 |> Js.Promise.then_(x => {
811 expect(x)
812 |> toEqual(([||], [| Push(1), Push(2), End |]))
813 |> Js.Promise.resolve
814 })
815 });
816
817 testPromise("follows the spec for listenables when ending the source", () => {
818 let end_: talkbackT = Close;
819
820 Wonka_thelpers.testWithListenable(Wonka.takeWhile((._) => false))
821 |> Js.Promise.then_(x => {
822 expect(x)
823 |> toEqual(([| end_ |], [| End |]))
824 |> Js.Promise.resolve
825 })
826 });
827
828 testPromise("ends itself and source when its talkback receives the End signal", () => {
829 let end_: talkbackT = Close;
830
831 Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((._) => true))
832 |> Js.Promise.then_(x => {
833 expect(x)
834 |> toEqual(([| end_ |], [| Push(1) |]))
835 |> Js.Promise.resolve
836 })
837 });
838 });
839
840 describe("takeUntil", () => {
841 open Expect;
842
843 it("only lets the last n values through on an entirely new source", () => {
844 let talkback = ref((._: Wonka_types.talkbackT) => ());
845 let notify = ref((_: Wonka_types.talkbackT) => ());
846 let num = ref(1);
847
848 let notifier = sink => {
849 notify := signal => switch (signal) {
850 | Pull => sink(.Push(0));
851 | _ => ()
852 };
853
854 sink(.Start(Wonka_helpers.talkbackPlaceholder));
855 };
856
857 let source = Wonka.takeUntil(notifier, sink => sink(.Start((.signal) => {
858 switch (signal) {
859 | Pull when num^ <= 4 => {
860 let i = num^;
861 if (i === 3) notify^(Pull);
862 num := num^ + 1;
863 sink(.Push(i));
864 }
865 | _ => ()
866 }
867 })));
868
869 let res = [||];
870
871 source((.signal) => {
872 switch (signal) {
873 | Start(x) => talkback := x
874 | _ => ignore(Js.Array.push(signal, res))
875 }
876 });
877
878 talkback^(.Pull);
879 talkback^(.Pull);
880 talkback^(.Pull);
881 talkback^(.Pull);
882
883 expect(res) |> toEqual([| Push(1), Push(2), End |]);
884 });
885
886 it("accepts the end of the source when max number of emissions is not reached", () => {
887 let talkback = ref((._: Wonka_types.talkbackT) => ());
888 let num = ref(1);
889 let notifier = sink => sink(.Start(Wonka_helpers.talkbackPlaceholder));
890
891 let source = Wonka.takeUntil(notifier, sink => sink(.Start((.signal) => {
892 switch (signal) {
893 | Pull => {
894 let i = num^;
895 if (num^ <= 2) {
896 num := num^ + 1;
897 sink(.Push(i));
898 } else {
899 sink(.End);
900 }
901 }
902 | _ => ()
903 }
904 })));
905
906 let res = [||];
907
908 source((.signal) => {
909 switch (signal) {
910 | Start(x) => talkback := x
911 | _ => ignore(Js.Array.push(signal, res))
912 }
913 });
914
915 talkback^(.Pull);
916 talkback^(.Pull);
917 talkback^(.Pull);
918 talkback^(.Pull);
919
920 expect(res) |> toEqual([| Push(1), Push(2), End |]);
921 });
922
923 testPromise("follows the spec for listenables", () => {
924 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.never))
925 |> Js.Promise.then_(x => {
926 expect(x)
927 |> toEqual(([||], [| Push(1), Push(2), End |]))
928 |> Js.Promise.resolve
929 })
930 });
931
932 testPromise("follows the spec for listenables when ending the source", () => {
933 let end_: talkbackT = Close;
934
935 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.fromValue(0)))
936 |> Js.Promise.then_(x => {
937 expect(x)
938 |> toEqual(([| end_ |], [| End |]))
939 |> Js.Promise.resolve
940 })
941 });
942
943 testPromise("ends itself and source when its talkback receives the End signal", () => {
944 let end_: talkbackT = Close;
945
946 Wonka_thelpers.testTalkbackEnd(Wonka.takeUntil(Wonka.never))
947 |> Js.Promise.then_(x => {
948 expect(x)
949 |> toEqual(([| end_ |], [| Push(1) |]))
950 |> Js.Promise.resolve
951 })
952 });
953 });
954
955 describe("skip", () => {
956 open Expect;
957
958 it("only lets values through after a number of values have been filtered out", () => {
959 let talkback = ref((._: Wonka_types.talkbackT) => ());
960 let num = ref(1);
961
962 let source = Wonka.skip(2, sink => sink(.Start((.signal) => {
963 switch (signal) {
964 | Pull when num^ <= 4 => {
965 let i = num^;
966 num := num^ + 1;
967 sink(.Push(i));
968 }
969 | Pull => sink(.End)
970 | _ => ()
971 }
972 })));
973
974 let res = [||];
975
976 source((.signal) => {
977 switch (signal) {
978 | Start(x) => talkback := x
979 | _ => ignore(Js.Array.push(signal, res))
980 }
981 });
982
983 talkback^(.Pull);
984 talkback^(.Pull);
985 talkback^(.Pull);
986 expect(res) |> toEqual([| Push(3), Push(4), End |]);
987 });
988
989 testPromise("follows the spec for listenables", () => {
990 Wonka_thelpers.testWithListenable(Wonka.skip(0))
991 |> Js.Promise.then_(x => {
992 expect(x)
993 |> toEqual(([||], [| Push(1), Push(2), End |]))
994 |> Js.Promise.resolve
995 })
996 });
997
998 testPromise("follows the spec for listenables when skipping the source", () => {
999 Wonka_thelpers.testWithListenable(Wonka.skip(10))
1000 |> Js.Promise.then_(x => {
1001 expect(x)
1002 |> toEqual(([| Pull, Pull |], [| End |]))
1003 |> Js.Promise.resolve
1004 })
1005 });
1006
1007 testPromise("ends itself and source when its talkback receives the End signal", () => {
1008 let end_: talkbackT = Close;
1009
1010 Wonka_thelpers.testTalkbackEnd(Wonka.skip(10))
1011 |> Js.Promise.then_(x => {
1012 expect(x)
1013 |> toEqual(([| Pull, end_ |], [| |]))
1014 |> Js.Promise.resolve
1015 })
1016 });
1017 });
1018
1019 describe("skipWhile", () => {
1020 open Expect;
1021
1022 it("only lets values through after the predicate returned false, including the first such value", () => {
1023 let talkback = ref((._: Wonka_types.talkbackT) => ());
1024 let num = ref(1);
1025
1026 let source = Wonka.skipWhile((.x) => x <= 2, sink => sink(.Start((.signal) => {
1027 switch (signal) {
1028 | Pull when num^ <= 4 => {
1029 let i = num^;
1030 num := num^ + 1;
1031 sink(.Push(i));
1032 }
1033 | Pull => sink(.End)
1034 | _ => ()
1035 }
1036 })));
1037
1038 let res = [||];
1039
1040 source((.signal) => {
1041 switch (signal) {
1042 | Start(x) => talkback := x
1043 | _ => ignore(Js.Array.push(signal, res))
1044 }
1045 });
1046
1047 talkback^(.Pull);
1048 talkback^(.Pull);
1049 talkback^(.Pull);
1050 expect(res) |> toEqual([| Push(3), Push(4), End |]);
1051 });
1052
1053 testPromise("follows the spec for listenables", () => {
1054 Wonka_thelpers.testWithListenable(Wonka.skipWhile((._) => false))
1055 |> Js.Promise.then_(x => {
1056 expect(x)
1057 |> toEqual(([||], [| Push(1), Push(2), End |]))
1058 |> Js.Promise.resolve
1059 })
1060 });
1061
1062 testPromise("follows the spec for listenables when skipping the source", () => {
1063 Wonka_thelpers.testWithListenable(Wonka.skipWhile((._) => true))
1064 |> Js.Promise.then_(x => {
1065 expect(x)
1066 |> toEqual(([| Pull, Pull |], [| End |]))
1067 |> Js.Promise.resolve
1068 })
1069 });
1070
1071 testPromise("ends itself and source when its talkback receives the End signal", () => {
1072 let end_: talkbackT = Close;
1073
1074 Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((._) => false))
1075 |> Js.Promise.then_(x => {
1076 expect(x)
1077 |> toEqual(([| end_ |], [| Push(1) |]))
1078 |> Js.Promise.resolve
1079 })
1080 });
1081 });
1082
1083 describe("skipUntil", () => {
1084 open Expect;
1085
1086 it("only lets values through after the notifier emits a value", () => {
1087 let talkback = ref((._: Wonka_types.talkbackT) => ());
1088 let notify = ref((_: Wonka_types.talkbackT) => ());
1089 let num = ref(1);
1090
1091 let notifier = sink => {
1092 notify := signal => switch (signal) {
1093 | Pull => sink(.Push(0));
1094 | _ => ()
1095 };
1096
1097 sink(.Start(Wonka_helpers.talkbackPlaceholder));
1098 };
1099
1100 let source = Wonka.skipUntil(notifier, (sink) => sink(.Start((.signal) => {
1101 switch (signal) {
1102 | Pull when num^ <= 4 => {
1103 let i = num^;
1104 if (i === 3) notify^(Pull);
1105 num := num^ + 1;
1106 sink(.Push(i));
1107 }
1108 | Pull => sink(.End)
1109 | _ => ()
1110 }
1111 })));
1112
1113 let res = [||];
1114
1115 source((.signal) => {
1116 switch (signal) {
1117 | Start(x) => talkback := x
1118 | _ => ignore(Js.Array.push(signal, res))
1119 }
1120 });
1121
1122 talkback^(.Pull);
1123 talkback^(.Pull);
1124 talkback^(.Pull);
1125 talkback^(.Pull);
1126
1127 expect(res) |> toEqual([| Push(3), Push(4), End |]);
1128 });
1129
1130 it("accepts the end of the source when max number of emissions is not reached", () => {
1131 let talkback = ref((._: Wonka_types.talkbackT) => ());
1132 let num = ref(1);
1133 let notifier = sink => sink(.Start(Wonka_helpers.talkbackPlaceholder));
1134
1135 let source = Wonka.skipUntil(notifier, (sink) => sink(.Start((.signal) => {
1136 switch (signal) {
1137 | Pull => {
1138 let i = num^;
1139 if (i < 2) {
1140 num := num^ + 1;
1141 sink(.Push(i));
1142 } else {
1143 sink(.End);
1144 }
1145 }
1146 | _ => ()
1147 }
1148 })));
1149
1150 let res = [||];
1151
1152 source((.signal) => {
1153 switch (signal) {
1154 | Start(x) => talkback := x
1155 | _ => ignore(Js.Array.push(signal, res))
1156 }
1157 });
1158
1159 talkback^(.Pull);
1160 talkback^(.Pull);
1161 talkback^(.Pull);
1162
1163 expect(res) |> toEqual([| End |]);
1164 });
1165
1166 testPromise("follows the spec for listenables", () => {
1167 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.never))
1168 |> Js.Promise.then_(x => {
1169 expect(x)
1170 |> toEqual(([| Pull, Pull, Pull |], [| End |]))
1171 |> Js.Promise.resolve
1172 })
1173 });
1174
1175 testPromise("follows the spec for listenables when skipping the source", () => {
1176 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.fromValue(0)))
1177 |> Js.Promise.then_(x => {
1178 expect(x)
1179 |> toEqual(([| Pull |], [| Push(1), Push(2), End |]))
1180 |> Js.Promise.resolve
1181 })
1182 });
1183
1184 testPromise("ends itself and source when its talkback receives the End signal", () => {
1185 let end_: talkbackT = Close;
1186
1187 Wonka_thelpers.testTalkbackEnd(Wonka.skipUntil(Wonka.fromValue(0)))
1188 |> Js.Promise.then_(x => {
1189 expect(x)
1190 |> toEqual(([| Pull, end_ |], [| Push(1) |]))
1191 |> Js.Promise.resolve
1192 })
1193 });
1194 });
1195
1196 describe("flatten", () => {
1197 open Expect;
1198
1199 it("merges the result of multiple pullables into its source", () => {
1200 let talkback = ref((._: Wonka_types.talkbackT) => ());
1201 let source = Wonka.fromList([ Wonka.fromList([ 1, 2 ]), Wonka.fromList([ 1, 2 ]) ])
1202 |> Wonka.flatten;
1203
1204 let res = [||];
1205
1206 source((.signal) => {
1207 switch (signal) {
1208 | Start(x) => talkback := x
1209 | _ => ignore(Js.Array.push(signal, res))
1210 }
1211 });
1212
1213 talkback^(.Pull);
1214 talkback^(.Pull);
1215 talkback^(.Pull);
1216 talkback^(.Pull);
1217 talkback^(.Pull);
1218 expect(res) |> toEqual([| Push(1), Push(2), Push(1), Push(2), End |]);
1219 });
1220 });
1221
1222 describe("switchMap", () => {
1223 afterEach(() => Jest.useRealTimers());
1224
1225 open Expect;
1226 open! Expect.Operators;
1227
1228 it("maps from a source and switches to a new source", () => {
1229 let a = Wonka.fromList([1, 2, 3]);
1230
1231 let talkback = ref((. _: Wonka_types.talkbackT) => ());
1232 let signals = [||];
1233 let source = Wonka.switchMap((.x) => Wonka.fromList([x * x]), a);
1234
1235 source((.signal) =>
1236 switch (signal) {
1237 | Start(x) =>
1238 talkback := x;
1239 x(.Pull);
1240 | Push(_) =>
1241 ignore(Js.Array.push(signal, signals));
1242 talkback^(.Pull);
1243 | End => ignore(Js.Array.push(signal, signals))
1244 }
1245 );
1246
1247 expect(signals) == [|Push(1), Push(4), Push(9), End|];
1248 });
1249
1250 it("unsubscribes from previous subscriptions", () => {
1251 Jest.useFakeTimers();
1252
1253 let a = Wonka.interval(100);
1254
1255 let talkback = ref((._: Wonka_types.talkbackT) => ());
1256 let signals = [||];
1257 let source =
1258 Wonka.switchMap((._) => Wonka.interval(25), a) |> Wonka.take(5);
1259
1260 source((.signal) =>
1261 switch (signal) {
1262 | Start(x) =>
1263 talkback := x;
1264 x(.Pull);
1265 | Push(_) =>
1266 ignore(Js.Array.push(signal, signals));
1267 talkback^(.Pull);
1268 | End => ignore(Js.Array.push(signal, signals))
1269 }
1270 );
1271
1272 Jest.runTimersToTime(300);
1273
1274 expect(signals)
1275 == [|Push(0), Push(1), Push(2), Push(0), Push(1), End|];
1276 });
1277
1278 testPromise("follows the spec for listenables", () =>
1279 Wonka_thelpers.testWithListenable(source =>
1280 Wonka.switchMap((.x) => x, Wonka.fromList([source]))
1281 )
1282 |> Js.Promise.then_(x =>
1283 expect(x)
1284 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|]))
1285 |> Js.Promise.resolve
1286 )
1287 );
1288
1289 testPromise(
1290 "ends itself and source when its talkback receives the End signal", () =>
1291 Wonka_thelpers.testTalkbackEnd(source =>
1292 Wonka.switchMap((.x) => x, Wonka.fromList([source]))
1293 )
1294 |> Js.Promise.then_(x =>
1295 expect(x)
1296 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|]))
1297 |> Js.Promise.resolve
1298 )
1299 );
1300 });
1301});
1302
1303describe("sink factories", () => {
1304 describe("forEach", () => {
1305 open Expect;
1306
1307 it("calls a function for each emission of the passed source", () => {
1308 let i = ref(0);
1309 let nums = [||];
1310
1311 let source = sink => {
1312 sink(.Start((.signal) => {
1313 switch (signal) {
1314 | Pull when i^ < 4 => {
1315 let num = i^;
1316 i := i^ + 1;
1317 sink(.Push(num));
1318 }
1319 | Pull => sink(.End)
1320 | _ => ()
1321 }
1322 }));
1323 };
1324
1325 Wonka.forEach((.x) => ignore(Js.Array.push(x, nums)), source);
1326 expect(nums) |> toEqual([| 0, 1, 2, 3 |])
1327 });
1328 });
1329
1330 describe("subscribe", () => {
1331 open Expect;
1332
1333 it("calls a function for each emission of the passed source and stops when unsubscribed", () => {
1334 let i = ref(0);
1335 let nums = [||];
1336 let push = ref(() => ());
1337
1338 let source = sink => {
1339 push := () => {
1340 let num = i^;
1341 i := i^ + 1;
1342 sink(.Push(num));
1343 };
1344
1345 sink(.Start(Wonka_helpers.talkbackPlaceholder));
1346 };
1347
1348 let { unsubscribe } = Wonka.subscribe((.x) => ignore(Js.Array.push(x, nums)), source);
1349
1350 push^();
1351 push^();
1352 unsubscribe();
1353 push^();
1354 push^();
1355
1356 expect(nums) |> toEqual([| 0, 1 |])
1357 });
1358 });
1359});
1360
1361describe("chains (integration)", () => {
1362 open Expect;
1363
1364 it("fromArray, map, forEach", () => {
1365 let input = Array.mapi((i, _) => i, Array.make(1000, 1));
1366 let output = Array.map(x => string_of_int(x));
1367 let actual = [||];
1368
1369 input
1370 |> Wonka.fromArray
1371 |> Wonka.map((.x) => string_of_int(x))
1372 |> Wonka.forEach((.x) => ignore(Js.Array.push(x, actual)));
1373
1374 expect(output) |> toEqual(output)
1375 });
1376});
1377
1378describe("subject", () => {
1379 open Expect;
1380 open! Expect.Operators;
1381
1382 it("sends values passed to .next to puller sinks", () => {
1383 let signals = [||];
1384
1385 let subject = Wonka.makeSubject();
1386
1387 subject.source((.signal) =>
1388 switch (signal) {
1389 | Start(_) => ignore()
1390 | Push(_) => ignore(Js.Array.push(signal, signals))
1391 | End => ignore(Js.Array.push(signal, signals))
1392 }
1393 );
1394
1395 subject.next(10);
1396 subject.next(20);
1397 subject.next(30);
1398 subject.next(40);
1399 subject.complete();
1400
1401 expect(signals) == [|Push(10), Push(20), Push(30), Push(40), End|];
1402 });
1403
1404 it("handles multiple sinks", () => {
1405 let talkback = ref((._: Wonka_types.talkbackT) => ());
1406 let signalsOne = [||];
1407 let signalsTwo = [||];
1408
1409 let subject = Wonka.makeSubject();
1410
1411 subject.source((.signal) =>
1412 switch (signal) {
1413 | Start(x) => talkback := x
1414 | Push(_) => ignore(Js.Array.push(signal, signalsOne))
1415 | End => ignore(Js.Array.push(signal, signalsOne))
1416 }
1417 );
1418
1419 subject.source((.signal) =>
1420 switch (signal) {
1421 | Start(_) => ignore()
1422 | Push(_) => ignore(Js.Array.push(signal, signalsTwo))
1423 | End => ignore(Js.Array.push(signal, signalsTwo))
1424 }
1425 );
1426
1427 subject.next(10);
1428 subject.next(20);
1429 subject.next(30);
1430
1431 talkback^(.Close);
1432
1433 subject.next(40);
1434 subject.next(50);
1435
1436 subject.complete();
1437
1438 expect((signalsOne, signalsTwo))
1439 == (
1440 [|Push(10), Push(20), Push(30)|],
1441 [|Push(10), Push(20), Push(30), Push(40), Push(50), End|],
1442 );
1443 });
1444
1445 it("handles multiple sinks that subscribe and close at different times", () => {
1446 let talkbackOne = ref((._: Wonka_types.talkbackT) => ());
1447 let talkbackTwo = ref((._: Wonka_types.talkbackT) => ());
1448 let signalsOne = [||];
1449 let signalsTwo = [||];
1450
1451 let subject = Wonka.makeSubject();
1452
1453 subject.next(10);
1454 subject.next(20);
1455
1456 subject.source((.signal) =>
1457 switch (signal) {
1458 | Start(x) => talkbackOne := x
1459 | Push(_) => ignore(Js.Array.push(signal, signalsOne))
1460 | End => ignore(Js.Array.push(signal, signalsOne))
1461 }
1462 );
1463
1464 subject.next(30);
1465
1466 subject.source((.signal) =>
1467 switch (signal) {
1468 | Start(x) => talkbackTwo := x
1469 | Push(_) => ignore(Js.Array.push(signal, signalsTwo))
1470 | End => ignore(Js.Array.push(signal, signalsTwo))
1471 }
1472 );
1473
1474 subject.next(40);
1475 subject.next(50);
1476
1477 talkbackTwo^(.Close);
1478
1479 subject.next(60);
1480
1481 talkbackOne^(.Close);
1482
1483 subject.next(70);
1484 subject.complete();
1485
1486 expect((signalsOne, signalsTwo))
1487 == (
1488 [|Push(30), Push(40), Push(50), Push(60)|],
1489 [|Push(40), Push(50)|],
1490 );
1491 });
1492});