Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add spec/listenable-related tests and fix up small discrepancies

+3
.jest.coverage.json
···
"moduleFileExtensions": [
"js"
],
+
"testMatch": [
+
"**/__tests__/*_test.js"
+
],
"modulePathIgnorePatterns": [
"/es6/"
]
+353 -1
__tests__/wonka_test.re
···
});
});
-
describe("empty", () => {
+
describe("never", () => {
open Expect;
open! Expect.Operators;
···
expect(nums) |> toEqual([|1, 2, 3, 4|])
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.map(x => x))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.map(x => x))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("filter", () => {
···
expect(nums) |> toEqual([|2, 4|])
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.filter((_) => true))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when filtering", () => {
+
Wonka_thelpers.testWithListenable(Wonka.filter((_) => false))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, Pull |], [| End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.filter((_) => true))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("scan", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| Push(1), Push(3), Push(6), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.scan((_, x) => x, 0))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.scan((_, x) => x, 0))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("merge", () => {
···
expect(signals) == [| Push(1), Push(4), Push(5), Push(6), Push(2), Push(3), End |];
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(source => Wonka.merge([|source|]))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(source => Wonka.merge([|source|]))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("concat", () => {
···
});
expect(signals) == [| Push(1), Push(2), Push(3), Push(4), Push(5), Push(6), End |];
+
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(source => Wonka.concat([|source|]))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull |], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(source => Wonka.concat([|source|]))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
});
});
···
talkback^(Pull);
expect((numsA, nums)) |> toEqual(([| Push(1), Push(1), Push(1) |], [| Push(1), Push(1), Push(1), Push(2), Push(2), End, End |]));
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.share)
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.share)
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("combine", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| Push((1, 2)), Push((2, 2)), Push((2, 4)), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(source => {
+
let shared = Wonka.share(source);
+
Wonka.combine(shared, shared)
+
})
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push((1, 1)), Push((2, 1)), Push((2, 2)), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(source => {
+
let shared = Wonka.share(source);
+
Wonka.combine(shared, shared)
+
})
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push((1, 1)) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("take", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| Push(1), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.take(10))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when ending the source", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testWithListenable(Wonka.take(1))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.take(10))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("takeLast", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| Push(3), Push(4), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.takeLast(10))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, Pull, Pull |], [| /* empty since the source is a pullable */ |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
Wonka_thelpers.testTalkbackEnd(Wonka.takeLast(10))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, Pull |], [| |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("takeWhile", () => {
···
expect(res) |> toEqual([| Push(1), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.takeWhile((_) => true))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when ending the source", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testWithListenable(Wonka.takeWhile((_) => false))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((_) => true))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("takeUntil", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| Push(1), Push(2), End |]);
+
});
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.never))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when ending the source", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testWithListenable(Wonka.takeUntil(Wonka.fromValue(0)))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.takeUntil(Wonka.never))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
});
});
···
talkback^(Pull);
expect(res) |> toEqual([| Push(3), Push(4), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skip(0))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when skipping the source", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skip(10))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, Pull |], [| End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.skip(10))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, end_ |], [| |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("skipWhile", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| Push(3), Push(4), End |]);
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skipWhile((_) => false))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([||], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when skipping the source", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skipWhile((_) => true))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, Pull |], [| End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((_) => false))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
+
});
});
describe("skipUntil", () => {
···
talkback^(Pull);
expect(res) |> toEqual([| End |]);
+
});
+
+
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.never))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, Pull, Pull |], [| End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("follows the spec for listenables when skipping the source", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skipUntil(Wonka.fromValue(0)))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull |], [| Push(1), Push(2), End |]))
+
|> Js.Promise.resolve
+
})
+
});
+
+
testPromise("ends itself and source when its talkback receives the End signal", () => {
+
let end_: talkbackT = End;
+
+
Wonka_thelpers.testTalkbackEnd(Wonka.skipUntil(Wonka.fromValue(0)))
+
|> Js.Promise.then_(x => {
+
expect(x)
+
|> toEqual(([| Pull, end_ |], [| Push(1) |]))
+
|> Js.Promise.resolve
+
})
});
});
+63
__tests__/wonka_thelpers.re
···
+
open Wonka_types;
+
+
let testWithListenable = operator => {
+
let sink = ref((_: signalT(int)) => ());
+
let signals = [||];
+
let source = x => {
+
sink := x;
+
x(Start(signal => {
+
ignore(Js.Array.push(signal, signals))
+
}));
+
};
+
+
let talkback = ref((_: talkbackT) => ());
+
let res = [||];
+
operator(source)(signal => {
+
switch (signal) {
+
| Start(x) => talkback := x
+
| _ => ignore(Js.Array.push(signal, res))
+
}
+
});
+
+
Js.Promise.make((~resolve, ~reject as _) => {
+
sink^(Push(1));
+
ignore(Js.Global.setTimeout(() => {
+
sink^(Push(2));
+
ignore(Js.Global.setTimeout(() => {
+
sink^(End);
+
ignore(Js.Global.setTimeout(() => {
+
[@bs] resolve((signals, res));
+
}, 0));
+
}, 0));
+
}, 0));
+
})
+
};
+
+
let testTalkbackEnd = operator => {
+
let sink = ref((_: signalT(int)) => ());
+
let signals: array(talkbackT) = [||];
+
let source = x => {
+
x(Start(signal => ignore(Js.Array.push(signal, signals))));
+
sink := x;
+
};
+
+
let talkback = ref((_: talkbackT) => ());
+
let res = [||];
+
operator(source)(signal => {
+
switch (signal) {
+
| Start(x) => talkback := x
+
| _ => ignore(Js.Array.push(signal, res))
+
}
+
});
+
+
Js.Promise.make((~resolve, ~reject as _) => {
+
sink^(Push(1));
+
ignore(Js.Global.setTimeout(() => {
+
let end_: talkbackT = End;
+
talkback^(end_);
+
ignore(Js.Global.setTimeout(() => {
+
[@bs] resolve((signals, res));
+
}, 0));
+
}, 0));
+
})
+
};
+37 -23
src/wonka.re
···
sourceA(signal => {
switch (signal, state.lastValB) {
| (Start(tb), _) => state.talkbackA = tb
-
| (Push(a), None) => state.lastValA = Some(a)
+
| (Push(a), None) => {
+
state.lastValA = Some(a);
+
state.gotSignal = false;
+
}
| (Push(a), Some(b)) when !state.ended => {
state.lastValA = Some(a);
state.gotSignal = false;
···
}
| (End, _) when state.endCounter < 1 =>
state.endCounter = state.endCounter + 1
-
| (End, _) => {
-
print_endline("end");
-
sink(End)
+
| (End, _) when !state.ended => {
+
state.ended = true;
+
sink(End);
}
| _ => ()
}
···
sourceB(signal => {
switch (signal, state.lastValA) {
| (Start(tb), _) => state.talkbackB = tb
-
| (Push(b), None) => state.lastValB = Some(b)
+
| (Push(b), None) => {
+
state.lastValB = Some(b);
+
state.gotSignal = false;
+
}
| (Push(b), Some(a)) when !state.ended => {
state.lastValB = Some(b);
state.gotSignal = false;
···
}
| (End, _) when state.endCounter < 1 =>
state.endCounter = state.endCounter + 1
-
| (End, _) => sink(End)
+
| (End, _) when !state.ended => {
+
state.ended = true;
+
sink(End);
+
}
| _ => ()
}
});
sink(Start(signal => {
-
switch (signal) {
-
| End => {
-
state.ended = true;
-
state.talkbackA(End);
-
state.talkbackB(End);
-
}
-
| Pull when !state.gotSignal => {
-
state.gotSignal = true;
-
state.talkbackA(signal);
-
state.talkbackB(signal);
-
}
-
| Pull => ()
-
}
+
if (!state.ended) {
+
switch (signal) {
+
| End => {
+
state.ended = true;
+
state.talkbackA(End);
+
state.talkbackB(End);
+
}
+
| Pull when !state.gotSignal => {
+
state.gotSignal = true;
+
state.talkbackA(signal);
+
state.talkbackB(signal);
+
}
+
| Pull => ()
+
}
+
};
}));
};
···
};
}
| Push(_) => ()
-
| End => {
+
| End when state.taken < max => {
state.taken = max;
sink(End)
}
+
| End => ()
}
});
···
talkback := tb;
sink(signal);
}
-
| End => {
+
| End when !ended^ => {
ended := true;
sink(End);
}
+
| End => ()
| Push(x) when !ended^ => {
if (!predicate(x)) {
ended := true;
···
}
});
}
-
| End => {
-
if (!state.ended) state.notifierTalkback(End);
+
| End when !state.ended => {
+
state.notifierTalkback(End);
state.ended = true;
sink(End);
}
+
| End => ()
| Push(_) when !state.ended => sink(signal)
| Push(_) => ()
}