Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v1.4.4 39 kB view raw
1open Jest; 2open Wonka_types; 3 4let it = test; 5 6describe("source factories", () => { 7 describe("fromList", () => { 8 open Expect; 9 open! Expect.Operators; 10 11 it("sends list items to a puller sink", () => { 12 let source = Wonka.fromList([10, 20, 30]); 13 let talkback = ref((._: Wonka_types.talkbackT) => ()); 14 let signals = [||]; 15 16 source((.signal) => { 17 switch (signal) { 18 | Start(x) => talkback := x; 19 | Push(_) => ignore(Js.Array.push(signal, signals)); 20 | End => ignore(Js.Array.push(signal, signals)) 21 }; 22 }); 23 24 talkback^(.Pull); 25 talkback^(.Pull); 26 talkback^(.Pull); 27 talkback^(.Pull); 28 29 expect(signals) == [| Push(10), Push(20), Push(30), End |]; 30 }); 31 }); 32 33 describe("fromArray", () => { 34 open Expect; 35 open! Expect.Operators; 36 37 it("sends array items to a puller sink", () => { 38 let source = Wonka.fromArray([| 10, 20, 30 |]); 39 let talkback = ref((._: Wonka_types.talkbackT) => ()); 40 let signals = ref([||]); 41 42 source((.signal) => { 43 switch (signal) { 44 | Start(x) => { 45 talkback := x; 46 x(.Pull); 47 } 48 | Push(_) => { 49 signals := Array.append(signals^, [|signal|]); 50 talkback^(.Pull); 51 } 52 | End => signals := Array.append(signals^, [|signal|]); 53 }; 54 }); 55 56 expect(signals^) == [| Push(10), Push(20), Push(30), End |]; 57 }); 58 59 it("does not blow up the stack when iterating something huge", () => { 60 let arr = Array.make(100000, 123); 61 let source = Wonka.fromArray(arr); 62 let talkback = ref((._: Wonka_types.talkbackT) => ()); 63 let values = [||]; 64 65 source((.signal) => { 66 switch (signal) { 67 | Start(x) => { 68 talkback := x; 69 x(.Pull); 70 } 71 | Push(x) => { 72 ignore(Js.Array.push(x, values)); 73 talkback^(.Pull); 74 } 75 | End => () 76 }; 77 }); 78 79 expect(Array.length(values)) == Array.length(arr); 80 }); 81 }); 82 83 describe("fromValue", () => { 84 open Expect; 85 open! Expect.Operators; 86 87 it("sends a single item to a puller sink", () => { 88 let source = Wonka.fromValue(123); 89 let talkback = ref((._: Wonka_types.talkbackT) => ()); 90 let signals = [||]; 91 92 source((.signal) => { 93 switch (signal) { 94 | Start(x) => talkback := x; 95 | Push(_) => ignore(Js.Array.push(signal, signals)); 96 | End => ignore(Js.Array.push(signal, signals)) 97 }; 98 }); 99 100 talkback^(.Pull); 101 talkback^(.Pull); 102 talkback^(.Pull); /* one extra to check whether no signal comes back after it has ended */ 103 104 expect(signals) == [| Push(123), End |]; 105 }); 106 }); 107 108 describe("empty", () => { 109 open Expect; 110 open! Expect.Operators; 111 112 it("ends immediately", () => { 113 let talkback = ref((._: Wonka_types.talkbackT) => ()); 114 let signals = [||]; 115 116 Wonka.empty((.signal) => { 117 switch (signal) { 118 | Start(x) => talkback := x; 119 | _ => ignore(Js.Array.push(signal, signals)) 120 }; 121 }); 122 123 let _signals = Array.copy(signals); 124 125 talkback^(.Pull); 126 talkback^(.Pull); 127 128 expect((_signals, signals)) == ([| End |], [| End |]); 129 }); 130 }); 131 132 describe("never", () => { 133 open Expect; 134 open! Expect.Operators; 135 136 it("does not end", () => { 137 let talkback = ref((._: Wonka_types.talkbackT) => ()); 138 let ended = ref(false); 139 140 Wonka.never((.signal) => { 141 switch (signal) { 142 | Start(x) => talkback := x; 143 | End => ended := true 144 | _ => () 145 }; 146 }); 147 148 talkback^(.Pull); 149 talkback^(.Pull); 150 151 expect(ended^) === false; 152 }); 153 }); 154}); 155 156describe("operator factories", () => { 157 describe("map", () => { 158 open Expect; 159 160 it("maps all emissions of a source", () => { 161 let num = ref(1); 162 let nums = [||]; 163 let talkback = ref((._: Wonka_types.talkbackT) => ()); 164 165 Wonka.map((._) => { 166 let res = num^; 167 num := num^ + 1; 168 res 169 }, sink => { 170 sink(.Start((.signal) => { 171 switch (signal) { 172 | Pull => sink(.Push(1)); 173 | _ => () 174 } 175 })); 176 }, (.signal) => { 177 switch (signal) { 178 | Start(x) => { 179 talkback := x; 180 x(.Pull); 181 } 182 | Push(x) when num^ < 6 => { 183 ignore(Js.Array.push(x, nums)); 184 talkback^(.Pull); 185 } 186 | _ => () 187 } 188 }); 189 190 expect(nums) |> toEqual([|1, 2, 3, 4|]) 191 }); 192 193 testPromise("follows the spec for listenables", () => { 194 Wonka_thelpers.testWithListenable(Wonka.map((.x) => x)) 195 |> Js.Promise.then_(x => { 196 expect(x) 197 |> toEqual(([||], [| Push(1), Push(2), End |])) 198 |> Js.Promise.resolve 199 }) 200 }); 201 202 testPromise("ends itself and source when its talkback receives the End signal", () => { 203 let end_: talkbackT = Close; 204 205 Wonka_thelpers.testTalkbackEnd(Wonka.map((.x) => x)) 206 |> Js.Promise.then_(x => { 207 expect(x) 208 |> toEqual(([| end_ |], [| Push(1) |])) 209 |> Js.Promise.resolve 210 }) 211 }); 212 }); 213 214 describe("filter", () => { 215 open Expect; 216 217 it("filters emissions according to a predicate", () => { 218 let i = ref(1); 219 let nums = [||]; 220 let talkback = ref((._: Wonka_types.talkbackT) => ()); 221 222 Wonka.filter((.x) => x mod 2 === 0, sink => { 223 sink(.Start((.signal) => { 224 switch (signal) { 225 | Pull => { 226 let num = i^; 227 i := i^ + 1; 228 sink(.Push(num)); 229 } 230 | _ => () 231 } 232 })); 233 }, (.signal) => { 234 switch (signal) { 235 | Start(x) => { 236 talkback := x; 237 x(.Pull); 238 } 239 | Push(x) when x < 6 => { 240 ignore(Js.Array.push(x, nums)); 241 talkback^(.Pull); 242 } 243 | _ => () 244 } 245 }); 246 247 expect(nums) |> toEqual([|2, 4|]) 248 }); 249 250 testPromise("follows the spec for listenables", () => { 251 Wonka_thelpers.testWithListenable(Wonka.filter((._) => true)) 252 |> Js.Promise.then_(x => { 253 expect(x) 254 |> toEqual(([||], [| Push(1), Push(2), End |])) 255 |> Js.Promise.resolve 256 }) 257 }); 258 259 testPromise("follows the spec for listenables when filtering", () => { 260 Wonka_thelpers.testWithListenable(Wonka.filter((._) => false)) 261 |> Js.Promise.then_(x => { 262 expect(x) 263 |> toEqual(([| Pull, Pull |], [| End |])) 264 |> Js.Promise.resolve 265 }) 266 }); 267 268 testPromise("ends itself and source when its talkback receives the End signal", () => { 269 let end_: talkbackT = Close; 270 271 Wonka_thelpers.testTalkbackEnd(Wonka.filter((._) => true)) 272 |> Js.Promise.then_(x => { 273 expect(x) 274 |> toEqual(([| end_ |], [| Push(1) |])) 275 |> Js.Promise.resolve 276 }) 277 }); 278 }); 279 280 describe("scan", () => { 281 open Expect; 282 283 it("folds emissions using an initial seed value", () => { 284 let talkback = ref((._: Wonka_types.talkbackT) => ()); 285 let num = ref(1); 286 287 let source = Wonka.scan((.acc, x) => acc + x, 0, sink => sink(.Start((.signal) => { 288 switch (signal) { 289 | Pull => { 290 let i = num^; 291 if (i <= 3) { 292 num := num^ + 1; 293 sink(.Push(i)); 294 } else { 295 sink(.End); 296 } 297 } 298 | _ => () 299 } 300 }))); 301 302 let res = [||]; 303 304 source((.signal) => { 305 switch (signal) { 306 | Start(x) => talkback := x 307 | _ => ignore(Js.Array.push(signal, res)) 308 } 309 }); 310 311 talkback^(.Pull); 312 talkback^(.Pull); 313 talkback^(.Pull); 314 talkback^(.Pull); 315 expect(res) |> toEqual([| Push(1), Push(3), Push(6), End |]); 316 }); 317 318 testPromise("follows the spec for listenables", () => { 319 Wonka_thelpers.testWithListenable(Wonka.scan((._, x) => x, 0)) 320 |> Js.Promise.then_(x => { 321 expect(x) 322 |> toEqual(([||], [| Push(1), Push(2), End |])) 323 |> Js.Promise.resolve 324 }) 325 }); 326 327 testPromise("ends itself and source when its talkback receives the End signal", () => { 328 let end_: talkbackT = Close; 329 330 Wonka_thelpers.testTalkbackEnd(Wonka.scan((._, x) => x, 0)) 331 |> Js.Promise.then_(x => { 332 expect(x) 333 |> toEqual(([| end_ |], [| Push(1) |])) 334 |> Js.Promise.resolve 335 }) 336 }); 337 }); 338 339 describe("merge", () => { 340 open Expect; 341 open! Expect.Operators; 342 343 it("merges different sources into a single one", () => { 344 let a = Wonka.fromList([1, 2, 3]); 345 let b = Wonka.fromList([4, 5, 6]); 346 let talkback = ref((._: Wonka_types.talkbackT) => ()); 347 let signals = [||]; 348 let source = Wonka.merge([| a, b |]); 349 350 source((.signal) => { 351 switch (signal) { 352 | Start(x) => { 353 talkback := x; 354 x(.Pull); 355 } 356 | Push(_) => { 357 ignore(Js.Array.push(signal, signals)); 358 talkback^(.Pull); 359 } 360 | End => ignore(Js.Array.push(signal, signals)) 361 }; 362 }); 363 364 expect(signals) == [| Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End |]; 365 }); 366 367 testPromise("follows the spec for listenables", () => { 368 Wonka_thelpers.testWithListenable(source => Wonka.merge([|source|])) 369 |> Js.Promise.then_(x => { 370 expect(x) 371 |> toEqual(([| Pull, Pull, Pull |], [| Push(1), Push(2), End |])) 372 |> Js.Promise.resolve 373 }) 374 }); 375 376 testPromise("ends itself and source when its talkback receives the End signal", () => { 377 Wonka_thelpers.testTalkbackEnd(source => Wonka.merge([|source|])) 378 |> Js.Promise.then_(x => { 379 expect(x) 380 |> toEqual(([| Pull, Pull, Close |], [| Push(1) |])) 381 |> Js.Promise.resolve 382 }) 383 }); 384 }); 385 386 describe("concat", () => { 387 open Expect; 388 open! Expect.Operators; 389 390 it("concatenates different sources into a single one", () => { 391 let a = Wonka.fromList([1, 2, 3]); 392 let b = Wonka.fromList([4, 5, 6]); 393 let talkback = ref((._: Wonka_types.talkbackT) => ()); 394 let signals = [||]; 395 let source = Wonka.concat([| a, b |]); 396 397 source((.signal) => { 398 switch (signal) { 399 | Start(x) => { 400 talkback := x; 401 x(.Pull); 402 } 403 | Push(_) => { 404 ignore(Js.Array.push(signal, signals)); 405 talkback^(.Pull); 406 } 407 | End => ignore(Js.Array.push(signal, signals)) 408 }; 409 }); 410 411 expect(signals) == [| Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End |]; 412 }); 413 414 testPromise("follows the spec for listenables", () => { 415 Wonka_thelpers.testWithListenable(source => Wonka.concat([|source|])) 416 |> Js.Promise.then_(x => { 417 expect(x) 418 |> toEqual(([| Pull, Pull, Pull |], [| Push(1), Push(2), End |])) 419 |> Js.Promise.resolve 420 }) 421 }); 422 423 testPromise("ends itself and source when its talkback receives the End signal", () => { 424 Wonka_thelpers.testTalkbackEnd(source => Wonka.concat([|source|])) 425 |> Js.Promise.then_(x => { 426 expect(x) 427 |> toEqual(([| Pull, Pull, Close |], [| Push(1) |])) 428 |> Js.Promise.resolve 429 }) 430 }); 431 }); 432 433 describe("share", () => { 434 open Expect; 435 436 it("shares an underlying source with all sinks", () => { 437 let talkback = ref((._: Wonka_types.talkbackT) => ()); 438 let aborterTb = ref((._: Wonka_types.talkbackT) => ()); 439 let num = ref(1); 440 let nums = [||]; 441 442 let source = Wonka.share(sink => { 443 sink(.Start((.signal) => { 444 switch (signal) { 445 | Pull => { 446 let i = num^; 447 if (i <= 2) { 448 num := num^ + 1; 449 sink(.Push(i)); 450 } else { 451 sink(.End); 452 } 453 } 454 | _ => () 455 } 456 })); 457 }); 458 459 source((.signal) => { 460 switch (signal) { 461 | Start(x) => talkback := x 462 | _ => ignore(Js.Array.push(signal, nums)) 463 } 464 }); 465 466 source((.signal) => { 467 switch (signal) { 468 | Start(_) => () 469 | _ => ignore(Js.Array.push(signal, nums)) 470 } 471 }); 472 473 source((.signal) => { 474 switch (signal) { 475 | Start(tb) => aborterTb := tb 476 | _ => { 477 ignore(Js.Array.push(signal, nums)); 478 aborterTb^(.Close); 479 } 480 } 481 }); 482 483 talkback^(.Pull); 484 let numsA = Array.copy(nums); 485 talkback^(.Pull); 486 talkback^(.Pull); 487 talkback^(.Pull); 488 expect((numsA, nums)) |> toEqual(([| Push(1), Push(1), Push(1) |], [| Push(1), Push(1), Push(1), Push(2), Push(2), End, End |])); 489 }); 490 491 testPromise("follows the spec for listenables", () => { 492 Wonka_thelpers.testWithListenable(Wonka.share) 493 |> Js.Promise.then_(x => { 494 expect(x) 495 |> toEqual(([||], [| Push(1), Push(2), End |])) 496 |> Js.Promise.resolve 497 }) 498 }); 499 500 testPromise("ends itself and source when its talkback receives the End signal", () => { 501 let end_: talkbackT = Close; 502 503 Wonka_thelpers.testTalkbackEnd(Wonka.share) 504 |> Js.Promise.then_(x => { 505 expect(x) 506 |> toEqual(([| end_ |], [| Push(1) |])) 507 |> Js.Promise.resolve 508 }) 509 }); 510 }); 511 512 describe("combine", () => { 513 open Expect; 514 515 it("combines the latest values of two sources", () => { 516 let talkback = ref((._: Wonka_types.talkbackT) => ()); 517 518 let makeSource = (factor: int) => { 519 let num = ref(1); 520 521 sink => { 522 sink(.Start((.signal) => { 523 switch (signal) { 524 | Pull => { 525 if (num^ <= 2) { 526 let i = num^ * factor; 527 num := num^ + 1; 528 sink(.Push(i)); 529 } else { 530 sink(.End); 531 } 532 } 533 | _ => () 534 } 535 })); 536 } 537 }; 538 539 let sourceA = makeSource(1); 540 let sourceB = makeSource(2); 541 let source = Wonka.combine(sourceA, sourceB); 542 let res = [||]; 543 544 source((.signal) => { 545 switch (signal) { 546 | Start(x) => talkback := x 547 | _ => ignore(Js.Array.push(signal, res)) 548 } 549 }); 550 551 talkback^(.Pull); 552 talkback^(.Pull); 553 talkback^(.Pull); 554 talkback^(.Pull); 555 expect(res) |> toEqual([| Push((1, 2)), Push((2, 2)), Push((2, 4)), End |]); 556 }); 557 558 testPromise("follows the spec for listenables", () => { 559 Wonka_thelpers.testWithListenable(source => { 560 let shared = Wonka.share(source); 561 Wonka.combine(shared, shared) 562 }) 563 |> Js.Promise.then_(x => { 564 expect(x) 565 |> toEqual(([||], [| Push((1, 1)), Push((2, 1)), Push((2, 2)), End |])) 566 |> Js.Promise.resolve 567 }) 568 }); 569 570 testPromise("ends itself and source when its talkback receives the End signal", () => { 571 let end_: talkbackT = Close; 572 573 Wonka_thelpers.testTalkbackEnd(source => { 574 let shared = Wonka.share(source); 575 Wonka.combine(shared, shared) 576 }) 577 |> Js.Promise.then_(x => { 578 expect(x) 579 |> toEqual(([| end_ |], [| Push((1, 1)) |])) 580 |> Js.Promise.resolve 581 }) 582 }); 583 }); 584 585 describe("take", () => { 586 open Expect; 587 588 it("only lets a maximum number of values through", () => { 589 let talkback = ref((._: Wonka_types.talkbackT) => ()); 590 let num = ref(1); 591 592 let source = Wonka.take(2, sink => sink(.Start((.signal) => { 593 switch (signal) { 594 | Pull => { 595 let i = num^; 596 num := num^ + 1; 597 sink(.Push(i)); 598 } 599 | _ => () 600 } 601 }))); 602 603 let res = [||]; 604 605 source((.signal) => { 606 switch (signal) { 607 | Start(x) => talkback := x 608 | _ => ignore(Js.Array.push(signal, res)) 609 } 610 }); 611 612 talkback^(.Pull); 613 talkback^(.Pull); 614 talkback^(.Pull); 615 talkback^(.Pull); 616 expect(res) |> toEqual([| Push(1), Push(2), End |]); 617 }); 618 619 it("accepts the end of the source when max number of emissions is not reached", () => { 620 let talkback = ref((._: Wonka_types.talkbackT) => ()); 621 let num = ref(1); 622 623 let source = Wonka.take(2, sink => sink(.Start((.signal) => { 624 switch (signal) { 625 | Pull => { 626 let i = num^; 627 if (i < 2) { 628 num := num^ + 1; 629 sink(.Push(i)); 630 } else { 631 sink(.End); 632 } 633 } 634 | _ => () 635 } 636 }))); 637 638 let res = [||]; 639 640 source((.signal) => { 641 switch (signal) { 642 | Start(x) => talkback := x 643 | _ => ignore(Js.Array.push(signal, res)) 644 } 645 }); 646 647 talkback^(.Pull); 648 talkback^(.Pull); 649 talkback^(.Pull); 650 expect(res) |> toEqual([| Push(1), End |]); 651 }); 652 653 testPromise("follows the spec for listenables", () => { 654 Wonka_thelpers.testWithListenable(Wonka.take(10)) 655 |> Js.Promise.then_(x => { 656 expect(x) 657 |> toEqual(([||], [| Push(1), Push(2), End |])) 658 |> Js.Promise.resolve 659 }) 660 }); 661 662 testPromise("follows the spec for listenables when ending the source", () => { 663 let end_: talkbackT = Close; 664 665 Wonka_thelpers.testWithListenable(Wonka.take(1)) 666 |> Js.Promise.then_(x => { 667 expect(x) 668 |> toEqual(([| end_ |], [| Push(1), End |])) 669 |> Js.Promise.resolve 670 }) 671 }); 672 673 testPromise("ends itself and source when its talkback receives the End signal", () => { 674 let end_: talkbackT = Close; 675 676 Wonka_thelpers.testTalkbackEnd(Wonka.take(10)) 677 |> Js.Promise.then_(x => { 678 expect(x) 679 |> toEqual(([| end_ |], [| Push(1) |])) 680 |> Js.Promise.resolve 681 }) 682 }); 683 }); 684 685 describe("takeLast", () => { 686 open Expect; 687 688 it("only lets the last n values through on an entirely new source", () => { 689 let talkback = ref((._: Wonka_types.talkbackT) => ()); 690 let num = ref(1); 691 692 let source = Wonka.takeLast(2, sink => sink(.Start((.signal) => { 693 switch (signal) { 694 | Pull when num^ <= 4 => { 695 let i = num^; 696 num := num^ + 1; 697 sink(.Push(i)); 698 } 699 | Pull => sink(.End) 700 | _ => () 701 } 702 }))); 703 704 let res = [||]; 705 706 source((.signal) => { 707 switch (signal) { 708 | Start(x) => talkback := x 709 | _ => ignore(Js.Array.push(signal, res)) 710 } 711 }); 712 713 talkback^(.Pull); 714 talkback^(.Pull); 715 talkback^(.Pull); 716 expect(res) |> toEqual([| Push(3), Push(4), End |]); 717 }); 718 719 testPromise("follows the spec for listenables", () => { 720 Wonka_thelpers.testWithListenable(Wonka.takeLast(10)) 721 |> Js.Promise.then_(x => { 722 expect(x) 723 |> toEqual(([| Pull, Pull, Pull |], [| /* empty since the source is a pullable */ |])) 724 |> Js.Promise.resolve 725 }) 726 }); 727 728 testPromise("ends itself and source when its talkback receives the End signal", () => { 729 Wonka_thelpers.testTalkbackEnd(Wonka.takeLast(10)) 730 |> Js.Promise.then_(x => { 731 expect(x) 732 |> toEqual(([| Pull, Pull |], [| |])) 733 |> Js.Promise.resolve 734 }) 735 }); 736 }); 737 738 describe("takeWhile", () => { 739 open Expect; 740 741 it("only lets the last n values through on an entirely new source", () => { 742 let talkback = ref((._: Wonka_types.talkbackT) => ()); 743 let num = ref(1); 744 745 let source = Wonka.takeWhile((.x) => x <= 2, sink => sink(.Start((.signal) => { 746 switch (signal) { 747 | Pull => { 748 let i = num^; 749 num := num^ + 1; 750 sink(.Push(i)); 751 } 752 | _ => () 753 } 754 }))); 755 756 let res = [||]; 757 758 source((.signal) => { 759 switch (signal) { 760 | Start(x) => talkback := x 761 | _ => ignore(Js.Array.push(signal, res)) 762 } 763 }); 764 765 talkback^(.Pull); 766 talkback^(.Pull); 767 talkback^(.Pull); 768 talkback^(.Pull); 769 770 expect(res) |> toEqual([| Push(1), Push(2), End |]); 771 }); 772 773 it("accepts the end of the source when max number of emissions is not reached", () => { 774 let talkback = ref((._: Wonka_types.talkbackT) => ()); 775 let num = ref(1); 776 777 let source = Wonka.takeWhile((.x) => x <= 5, sink => sink(.Start((.signal) => { 778 switch (signal) { 779 | Pull => { 780 let i = num^; 781 if (i < 2) { 782 num := num^ + 1; 783 sink(.Push(i)); 784 } else { 785 sink(.End); 786 } 787 } 788 | _ => () 789 } 790 }))); 791 792 let res = [||]; 793 794 source((.signal) => { 795 switch (signal) { 796 | Start(x) => talkback := x 797 | _ => ignore(Js.Array.push(signal, res)) 798 } 799 }); 800 801 talkback^(.Pull); 802 talkback^(.Pull); 803 talkback^(.Pull); 804 805 expect(res) |> toEqual([| Push(1), End |]); 806 }); 807 808 testPromise("follows the spec for listenables", () => { 809 Wonka_thelpers.testWithListenable(Wonka.takeWhile((._) => true)) 810 |> Js.Promise.then_(x => { 811 expect(x) 812 |> toEqual(([||], [| Push(1), Push(2), End |])) 813 |> Js.Promise.resolve 814 }) 815 }); 816 817 testPromise("follows the spec for listenables when ending the source", () => { 818 let end_: talkbackT = Close; 819 820 Wonka_thelpers.testWithListenable(Wonka.takeWhile((._) => false)) 821 |> Js.Promise.then_(x => { 822 expect(x) 823 |> toEqual(([| end_ |], [| End |])) 824 |> Js.Promise.resolve 825 }) 826 }); 827 828 testPromise("ends itself and source when its talkback receives the End signal", () => { 829 let end_: talkbackT = Close; 830 831 Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((._) => true)) 832 |> Js.Promise.then_(x => { 833 expect(x) 834 |> toEqual(([| end_ |], [| Push(1) |])) 835 |> Js.Promise.resolve 836 }) 837 }); 838 }); 839 840 describe("takeUntil", () => { 841 open Expect; 842 843 it("only lets the last n values through on an entirely new source", () => { 844 let talkback = ref((._: Wonka_types.talkbackT) => ()); 845 let notify = ref((_: Wonka_types.talkbackT) => ()); 846 let num = ref(1); 847 848 let notifier = sink => { 849 notify := signal => switch (signal) { 850 | Pull => sink(.Push(0)); 851 | _ => () 852 }; 853 854 sink(.Start(Wonka_helpers.talkbackPlaceholder)); 855 }; 856 857 let source = Wonka.takeUntil(notifier, sink => sink(.Start((.signal) => { 858 switch (signal) { 859 | Pull when num^ <= 4 => { 860 let i = num^; 861 if (i === 3) notify^(Pull); 862 num := num^ + 1; 863 sink(.Push(i)); 864 } 865 | _ => () 866 } 867 }))); 868 869 let res = [||]; 870 871 source((.signal) => { 872 switch (signal) { 873 | Start(x) => talkback := x 874 | _ => ignore(Js.Array.push(signal, res)) 875 } 876 }); 877 878 talkback^(.Pull); 879 talkback^(.Pull); 880 talkback^(.Pull); 881 talkback^(.Pull); 882 883 expect(res) |> toEqual([| Push(1), Push(2), End |]); 884 }); 885 886 it("accepts the end of the source when max number of emissions is not reached", () => { 887 let talkback = ref((._: Wonka_types.talkbackT) => ()); 888 let num = ref(1); 889 let notifier = sink => sink(.Start(Wonka_helpers.talkbackPlaceholder)); 890 891 let source = Wonka.takeUntil(notifier, sink => sink(.Start((.signal) => { 892 switch (signal) { 893 | Pull => { 894 let i = num^; 895 if (num^ <= 2) { 896 num := num^ + 1; 897 sink(.Push(i)); 898 } else { 899 sink(.End); 900 } 901 } 902 | _ => () 903 } 904 }))); 905 906 let res = [||]; 907 908 source((.signal) => { 909 switch (signal) { 910 | Start(x) => talkback := x 911 | _ => ignore(Js.Array.push(signal, res)) 912 } 913 }); 914 915 talkback^(.Pull); 916 talkback^(.Pull); 917 talkback^(.Pull); 918 talkback^(.Pull); 919 920 expect(res) |> toEqual([| Push(1), Push(2), End |]); 921 }); 922 923 testPromise("follows the spec for listenables", () => { 924 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.never)) 925 |> Js.Promise.then_(x => { 926 expect(x) 927 |> toEqual(([||], [| Push(1), Push(2), End |])) 928 |> Js.Promise.resolve 929 }) 930 }); 931 932 testPromise("follows the spec for listenables when ending the source", () => { 933 let end_: talkbackT = Close; 934 935 Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.fromValue(0))) 936 |> Js.Promise.then_(x => { 937 expect(x) 938 |> toEqual(([| end_ |], [| End |])) 939 |> Js.Promise.resolve 940 }) 941 }); 942 943 testPromise("ends itself and source when its talkback receives the End signal", () => { 944 let end_: talkbackT = Close; 945 946 Wonka_thelpers.testTalkbackEnd(Wonka.takeUntil(Wonka.never)) 947 |> Js.Promise.then_(x => { 948 expect(x) 949 |> toEqual(([| end_ |], [| Push(1) |])) 950 |> Js.Promise.resolve 951 }) 952 }); 953 }); 954 955 describe("skip", () => { 956 open Expect; 957 958 it("only lets values through after a number of values have been filtered out", () => { 959 let talkback = ref((._: Wonka_types.talkbackT) => ()); 960 let num = ref(1); 961 962 let source = Wonka.skip(2, sink => sink(.Start((.signal) => { 963 switch (signal) { 964 | Pull when num^ <= 4 => { 965 let i = num^; 966 num := num^ + 1; 967 sink(.Push(i)); 968 } 969 | Pull => sink(.End) 970 | _ => () 971 } 972 }))); 973 974 let res = [||]; 975 976 source((.signal) => { 977 switch (signal) { 978 | Start(x) => talkback := x 979 | _ => ignore(Js.Array.push(signal, res)) 980 } 981 }); 982 983 talkback^(.Pull); 984 talkback^(.Pull); 985 talkback^(.Pull); 986 expect(res) |> toEqual([| Push(3), Push(4), End |]); 987 }); 988 989 testPromise("follows the spec for listenables", () => { 990 Wonka_thelpers.testWithListenable(Wonka.skip(0)) 991 |> Js.Promise.then_(x => { 992 expect(x) 993 |> toEqual(([||], [| Push(1), Push(2), End |])) 994 |> Js.Promise.resolve 995 }) 996 }); 997 998 testPromise("follows the spec for listenables when skipping the source", () => { 999 Wonka_thelpers.testWithListenable(Wonka.skip(10)) 1000 |> Js.Promise.then_(x => { 1001 expect(x) 1002 |> toEqual(([| Pull, Pull |], [| End |])) 1003 |> Js.Promise.resolve 1004 }) 1005 }); 1006 1007 testPromise("ends itself and source when its talkback receives the End signal", () => { 1008 let end_: talkbackT = Close; 1009 1010 Wonka_thelpers.testTalkbackEnd(Wonka.skip(10)) 1011 |> Js.Promise.then_(x => { 1012 expect(x) 1013 |> toEqual(([| Pull, end_ |], [| |])) 1014 |> Js.Promise.resolve 1015 }) 1016 }); 1017 }); 1018 1019 describe("skipWhile", () => { 1020 open Expect; 1021 1022 it("only lets values through after the predicate returned false, including the first such value", () => { 1023 let talkback = ref((._: Wonka_types.talkbackT) => ()); 1024 let num = ref(1); 1025 1026 let source = Wonka.skipWhile((.x) => x <= 2, sink => sink(.Start((.signal) => { 1027 switch (signal) { 1028 | Pull when num^ <= 4 => { 1029 let i = num^; 1030 num := num^ + 1; 1031 sink(.Push(i)); 1032 } 1033 | Pull => sink(.End) 1034 | _ => () 1035 } 1036 }))); 1037 1038 let res = [||]; 1039 1040 source((.signal) => { 1041 switch (signal) { 1042 | Start(x) => talkback := x 1043 | _ => ignore(Js.Array.push(signal, res)) 1044 } 1045 }); 1046 1047 talkback^(.Pull); 1048 talkback^(.Pull); 1049 talkback^(.Pull); 1050 expect(res) |> toEqual([| Push(3), Push(4), End |]); 1051 }); 1052 1053 testPromise("follows the spec for listenables", () => { 1054 Wonka_thelpers.testWithListenable(Wonka.skipWhile((._) => false)) 1055 |> Js.Promise.then_(x => { 1056 expect(x) 1057 |> toEqual(([||], [| Push(1), Push(2), End |])) 1058 |> Js.Promise.resolve 1059 }) 1060 }); 1061 1062 testPromise("follows the spec for listenables when skipping the source", () => { 1063 Wonka_thelpers.testWithListenable(Wonka.skipWhile((._) => true)) 1064 |> Js.Promise.then_(x => { 1065 expect(x) 1066 |> toEqual(([| Pull, Pull |], [| End |])) 1067 |> Js.Promise.resolve 1068 }) 1069 }); 1070 1071 testPromise("ends itself and source when its talkback receives the End signal", () => { 1072 let end_: talkbackT = Close; 1073 1074 Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((._) => false)) 1075 |> Js.Promise.then_(x => { 1076 expect(x) 1077 |> toEqual(([| end_ |], [| Push(1) |])) 1078 |> Js.Promise.resolve 1079 }) 1080 }); 1081 }); 1082 1083 describe("skipUntil", () => { 1084 open Expect; 1085 1086 it("only lets values through after the notifier emits a value", () => { 1087 let talkback = ref((._: Wonka_types.talkbackT) => ()); 1088 let notify = ref((_: Wonka_types.talkbackT) => ()); 1089 let num = ref(1); 1090 1091 let notifier = sink => { 1092 notify := signal => switch (signal) { 1093 | Pull => sink(.Push(0)); 1094 | _ => () 1095 }; 1096 1097 sink(.Start(Wonka_helpers.talkbackPlaceholder)); 1098 }; 1099 1100 let source = Wonka.skipUntil(notifier, (sink) => sink(.Start((.signal) => { 1101 switch (signal) { 1102 | Pull when num^ <= 4 => { 1103 let i = num^; 1104 if (i === 3) notify^(Pull); 1105 num := num^ + 1; 1106 sink(.Push(i)); 1107 } 1108 | Pull => sink(.End) 1109 | _ => () 1110 } 1111 }))); 1112 1113 let res = [||]; 1114 1115 source((.signal) => { 1116 switch (signal) { 1117 | Start(x) => talkback := x 1118 | _ => ignore(Js.Array.push(signal, res)) 1119 } 1120 }); 1121 1122 talkback^(.Pull); 1123 talkback^(.Pull); 1124 talkback^(.Pull); 1125 talkback^(.Pull); 1126 1127 expect(res) |> toEqual([| Push(3), Push(4), End |]); 1128 }); 1129 1130 it("accepts the end of the source when max number of emissions is not reached", () => { 1131 let talkback = ref((._: Wonka_types.talkbackT) => ()); 1132 let num = ref(1); 1133 let notifier = sink => sink(.Start(Wonka_helpers.talkbackPlaceholder)); 1134 1135 let source = Wonka.skipUntil(notifier, (sink) => sink(.Start((.signal) => { 1136 switch (signal) { 1137 | Pull => { 1138 let i = num^; 1139 if (i < 2) { 1140 num := num^ + 1; 1141 sink(.Push(i)); 1142 } else { 1143 sink(.End); 1144 } 1145 } 1146 | _ => () 1147 } 1148 }))); 1149 1150 let res = [||]; 1151 1152 source((.signal) => { 1153 switch (signal) { 1154 | Start(x) => talkback := x 1155 | _ => ignore(Js.Array.push(signal, res)) 1156 } 1157 }); 1158 1159 talkback^(.Pull); 1160 talkback^(.Pull); 1161 talkback^(.Pull); 1162 1163 expect(res) |> toEqual([| End |]); 1164 }); 1165 1166 testPromise("follows the spec for listenables", () => { 1167 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.never)) 1168 |> Js.Promise.then_(x => { 1169 expect(x) 1170 |> toEqual(([| Pull, Pull, Pull |], [| End |])) 1171 |> Js.Promise.resolve 1172 }) 1173 }); 1174 1175 testPromise("follows the spec for listenables when skipping the source", () => { 1176 Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.fromValue(0))) 1177 |> Js.Promise.then_(x => { 1178 expect(x) 1179 |> toEqual(([| Pull |], [| Push(1), Push(2), End |])) 1180 |> Js.Promise.resolve 1181 }) 1182 }); 1183 1184 testPromise("ends itself and source when its talkback receives the End signal", () => { 1185 let end_: talkbackT = Close; 1186 1187 Wonka_thelpers.testTalkbackEnd(Wonka.skipUntil(Wonka.fromValue(0))) 1188 |> Js.Promise.then_(x => { 1189 expect(x) 1190 |> toEqual(([| Pull, end_ |], [| Push(1) |])) 1191 |> Js.Promise.resolve 1192 }) 1193 }); 1194 }); 1195 1196 describe("flatten", () => { 1197 open Expect; 1198 1199 it("merges the result of multiple pullables into its source", () => { 1200 let talkback = ref((._: Wonka_types.talkbackT) => ()); 1201 let source = Wonka.fromList([ Wonka.fromList([ 1, 2 ]), Wonka.fromList([ 1, 2 ]) ]) 1202 |> Wonka.flatten; 1203 1204 let res = [||]; 1205 1206 source((.signal) => { 1207 switch (signal) { 1208 | Start(x) => talkback := x 1209 | _ => ignore(Js.Array.push(signal, res)) 1210 } 1211 }); 1212 1213 talkback^(.Pull); 1214 talkback^(.Pull); 1215 talkback^(.Pull); 1216 talkback^(.Pull); 1217 talkback^(.Pull); 1218 expect(res) |> toEqual([| Push(1), Push(2), Push(1), Push(2), End |]); 1219 }); 1220 }); 1221 1222 describe("switchMap", () => { 1223 afterEach(() => Jest.useRealTimers()); 1224 1225 open Expect; 1226 open! Expect.Operators; 1227 1228 it("maps from a source and switches to a new source", () => { 1229 let a = Wonka.fromList([1, 2, 3]); 1230 1231 let talkback = ref((. _: Wonka_types.talkbackT) => ()); 1232 let signals = [||]; 1233 let source = Wonka.switchMap((.x) => Wonka.fromList([x * x]), a); 1234 1235 source((.signal) => 1236 switch (signal) { 1237 | Start(x) => 1238 talkback := x; 1239 x(.Pull); 1240 | Push(_) => 1241 ignore(Js.Array.push(signal, signals)); 1242 talkback^(.Pull); 1243 | End => ignore(Js.Array.push(signal, signals)) 1244 } 1245 ); 1246 1247 expect(signals) == [|Push(1), Push(4), Push(9), End|]; 1248 }); 1249 1250 it("unsubscribes from previous subscriptions", () => { 1251 Jest.useFakeTimers(); 1252 1253 let a = Wonka.interval(100); 1254 1255 let talkback = ref((._: Wonka_types.talkbackT) => ()); 1256 let signals = [||]; 1257 let source = 1258 Wonka.switchMap((._) => Wonka.interval(25), a) |> Wonka.take(5); 1259 1260 source((.signal) => 1261 switch (signal) { 1262 | Start(x) => 1263 talkback := x; 1264 x(.Pull); 1265 | Push(_) => 1266 ignore(Js.Array.push(signal, signals)); 1267 talkback^(.Pull); 1268 | End => ignore(Js.Array.push(signal, signals)) 1269 } 1270 ); 1271 1272 Jest.runTimersToTime(300); 1273 1274 expect(signals) 1275 == [|Push(0), Push(1), Push(2), Push(0), Push(1), End|]; 1276 }); 1277 1278 testPromise("follows the spec for listenables", () => 1279 Wonka_thelpers.testWithListenable(source => 1280 Wonka.switchMap((.x) => x, Wonka.fromList([source])) 1281 ) 1282 |> Js.Promise.then_(x => 1283 expect(x) 1284 |> toEqual(([|Pull, Pull, Pull|], [|Push(1), Push(2), End|])) 1285 |> Js.Promise.resolve 1286 ) 1287 ); 1288 1289 testPromise( 1290 "ends itself and source when its talkback receives the End signal", () => 1291 Wonka_thelpers.testTalkbackEnd(source => 1292 Wonka.switchMap((.x) => x, Wonka.fromList([source])) 1293 ) 1294 |> Js.Promise.then_(x => 1295 expect(x) 1296 |> toEqual(([|Pull, Pull, Close|], [|Push(1)|])) 1297 |> Js.Promise.resolve 1298 ) 1299 ); 1300 }); 1301}); 1302 1303describe("sink factories", () => { 1304 describe("forEach", () => { 1305 open Expect; 1306 1307 it("calls a function for each emission of the passed source", () => { 1308 let i = ref(0); 1309 let nums = [||]; 1310 1311 let source = sink => { 1312 sink(.Start((.signal) => { 1313 switch (signal) { 1314 | Pull when i^ < 4 => { 1315 let num = i^; 1316 i := i^ + 1; 1317 sink(.Push(num)); 1318 } 1319 | Pull => sink(.End) 1320 | _ => () 1321 } 1322 })); 1323 }; 1324 1325 Wonka.forEach((.x) => ignore(Js.Array.push(x, nums)), source); 1326 expect(nums) |> toEqual([| 0, 1, 2, 3 |]) 1327 }); 1328 }); 1329 1330 describe("subscribe", () => { 1331 open Expect; 1332 1333 it("calls a function for each emission of the passed source and stops when unsubscribed", () => { 1334 let i = ref(0); 1335 let nums = [||]; 1336 let push = ref(() => ()); 1337 1338 let source = sink => { 1339 push := () => { 1340 let num = i^; 1341 i := i^ + 1; 1342 sink(.Push(num)); 1343 }; 1344 1345 sink(.Start(Wonka_helpers.talkbackPlaceholder)); 1346 }; 1347 1348 let { unsubscribe } = Wonka.subscribe((.x) => ignore(Js.Array.push(x, nums)), source); 1349 1350 push^(); 1351 push^(); 1352 unsubscribe(); 1353 push^(); 1354 push^(); 1355 1356 expect(nums) |> toEqual([| 0, 1 |]) 1357 }); 1358 }); 1359}); 1360 1361describe("chains (integration)", () => { 1362 open Expect; 1363 1364 it("fromArray, map, forEach", () => { 1365 let input = Array.mapi((i, _) => i, Array.make(1000, 1)); 1366 let output = Array.map(x => string_of_int(x)); 1367 let actual = [||]; 1368 1369 input 1370 |> Wonka.fromArray 1371 |> Wonka.map((.x) => string_of_int(x)) 1372 |> Wonka.forEach((.x) => ignore(Js.Array.push(x, actual))); 1373 1374 expect(output) |> toEqual(output) 1375 }); 1376}); 1377 1378describe("subject", () => { 1379 open Expect; 1380 open! Expect.Operators; 1381 1382 it("sends values passed to .next to puller sinks", () => { 1383 let signals = [||]; 1384 1385 let subject = Wonka.makeSubject(); 1386 1387 subject.source((.signal) => 1388 switch (signal) { 1389 | Start(_) => ignore() 1390 | Push(_) => ignore(Js.Array.push(signal, signals)) 1391 | End => ignore(Js.Array.push(signal, signals)) 1392 } 1393 ); 1394 1395 subject.next(10); 1396 subject.next(20); 1397 subject.next(30); 1398 subject.next(40); 1399 subject.complete(); 1400 1401 expect(signals) == [|Push(10), Push(20), Push(30), Push(40), End|]; 1402 }); 1403 1404 it("handles multiple sinks", () => { 1405 let talkback = ref((._: Wonka_types.talkbackT) => ()); 1406 let signalsOne = [||]; 1407 let signalsTwo = [||]; 1408 1409 let subject = Wonka.makeSubject(); 1410 1411 subject.source((.signal) => 1412 switch (signal) { 1413 | Start(x) => talkback := x 1414 | Push(_) => ignore(Js.Array.push(signal, signalsOne)) 1415 | End => ignore(Js.Array.push(signal, signalsOne)) 1416 } 1417 ); 1418 1419 subject.source((.signal) => 1420 switch (signal) { 1421 | Start(_) => ignore() 1422 | Push(_) => ignore(Js.Array.push(signal, signalsTwo)) 1423 | End => ignore(Js.Array.push(signal, signalsTwo)) 1424 } 1425 ); 1426 1427 subject.next(10); 1428 subject.next(20); 1429 subject.next(30); 1430 1431 talkback^(.Close); 1432 1433 subject.next(40); 1434 subject.next(50); 1435 1436 subject.complete(); 1437 1438 expect((signalsOne, signalsTwo)) 1439 == ( 1440 [|Push(10), Push(20), Push(30)|], 1441 [|Push(10), Push(20), Push(30), Push(40), Push(50), End|], 1442 ); 1443 }); 1444 1445 it("handles multiple sinks that subscribe and close at different times", () => { 1446 let talkbackOne = ref((._: Wonka_types.talkbackT) => ()); 1447 let talkbackTwo = ref((._: Wonka_types.talkbackT) => ()); 1448 let signalsOne = [||]; 1449 let signalsTwo = [||]; 1450 1451 let subject = Wonka.makeSubject(); 1452 1453 subject.next(10); 1454 subject.next(20); 1455 1456 subject.source((.signal) => 1457 switch (signal) { 1458 | Start(x) => talkbackOne := x 1459 | Push(_) => ignore(Js.Array.push(signal, signalsOne)) 1460 | End => ignore(Js.Array.push(signal, signalsOne)) 1461 } 1462 ); 1463 1464 subject.next(30); 1465 1466 subject.source((.signal) => 1467 switch (signal) { 1468 | Start(x) => talkbackTwo := x 1469 | Push(_) => ignore(Js.Array.push(signal, signalsTwo)) 1470 | End => ignore(Js.Array.push(signal, signalsTwo)) 1471 } 1472 ); 1473 1474 subject.next(40); 1475 subject.next(50); 1476 1477 talkbackTwo^(.Close); 1478 1479 subject.next(60); 1480 1481 talkbackOne^(.Close); 1482 1483 subject.next(70); 1484 subject.complete(); 1485 1486 expect((signalsOne, signalsTwo)) 1487 == ( 1488 [|Push(30), Push(40), Push(50), Push(60)|], 1489 [|Push(40), Push(50)|], 1490 ); 1491 }); 1492});