Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Fix skipUntil pulling and add early closing (#54)

* Rename gotSignal to pulled in skipUntil

* Fix pulling behaviour of skipUntil

* Close source when notifier will never emit

When the notifier ends before it emits any value
it's safe to close the source as it can never emit
any values anymore.

* Fix active pushing

Pull from notifier source on start so it has a chance
to send a value before the source pushes its first value.

+26 -21
src/wonka_operators.re
···
);
type skipUntilStateT = {
-
mutable skip: bool,
-
mutable ended: bool,
-
mutable gotSignal: bool,
mutable sourceTalkback: (. talkbackT) => unit,
mutable notifierTalkback: (. talkbackT) => unit,
+
mutable skip: bool,
+
mutable pulled: bool,
+
mutable ended: bool,
};
[@genType]
···
curry(source =>
curry(sink => {
let state: skipUntilStateT = {
-
skip: true,
-
ended: false,
-
gotSignal: false,
sourceTalkback: talkbackPlaceholder,
notifierTalkback: talkbackPlaceholder,
+
skip: true,
+
pulled: false,
+
ended: false,
};
source((. signal) =>
···
| Start(innerTb) =>
state.notifierTalkback = innerTb;
innerTb(. Pull);
-
tb(. Pull);
| Push(_) =>
state.skip = false;
state.notifierTalkback(. Close);
+
| End when state.skip =>
+
state.ended = true;
+
state.sourceTalkback(. Close);
| End => ()
}
);
-
| Push(_) when state.skip && !state.ended =>
-
state.sourceTalkback(. Pull)
-
| Push(_) when !state.ended =>
-
state.gotSignal = false;
+
| Push(_) when !state.skip && !state.ended =>
+
state.pulled = false;
sink(. signal);
| Push(_) => ()
| End =>
···
sink(.
Start(
(. signal) =>
-
switch (signal) {
-
| Close =>
-
if (state.skip) {
-
state.notifierTalkback(. Close);
+
if (!state.ended) {
+
switch (signal) {
+
| Close =>
+
state.ended = true;
+
state.sourceTalkback(. Close);
+
if (state.skip) {
+
state.notifierTalkback(. Close);
+
};
+
| Pull when !state.pulled =>
+
state.pulled = true;
+
if (state.skip) {
+
state.notifierTalkback(. Pull);
+
};
+
state.sourceTalkback(. Pull);
+
| Pull => ()
};
-
state.ended = true;
-
state.sourceTalkback(. Close);
-
| Pull when !state.gotSignal && !state.ended =>
-
state.gotSignal = true;
-
state.sourceTalkback(. Pull);
-
| Pull => ()
},
),
);
+4 -3
src/wonka_operators.test.ts
···
describe('skipUntil', () => {
const noop = operators.skipUntil(sources.fromValue(null));
-
// TODO: passesPassivePull(noop);
-
// TODO: passesActivePush(noop);
-
// TODO: passesSinkClose(noop);
+
passesPassivePull(noop);
+
passesActivePush(noop);
+
passesSinkClose(noop);
passesSourceEnd(noop);
passesSingleStart(noop);
passesAsyncSequence(noop);
+
passesStrictEnd(noop);
it('skips values until the notifier source emits', () => {
const { source: notifier$, next: notify } = sources.makeSubject();