Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Remove redundant Close/End guards from operators and add them to sources (#55)

* Remove strict end test from map operator

The map operator doesn't need to guard
against signals after End since it's a
safe and repeatable effect.

* Remove strict end guard from onEnd operator

The onEnd operator does run a side-effect, but
it can rely on the convention of no signals being
emitted after End/Close.

* Add Close guards to all sources

Since we now assume that sources behave
well and operators don't have to guard
Close events, unless they're dealing with
multiple sources and side-effects, the
sources can be amended instead.

+5 -14
src/wonka_operators.re
···
let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
curry(source =>
curry(sink => {
-
let ended = ref(false);
-
source((. signal) =>
switch (signal) {
| Start(talkback) =>
···
Start(
(. signal) => {
switch (signal) {
-
| Close when ! ended^ =>
-
ended := true;
-
f(.);
-
| Close
-
| Pull => ()
+
| Close => f(.)
+
| _ => ()
};
-
talkback(. signal);
},
),
)
| End =>
-
if (! ended^) {
-
ended := true;
-
sink(. signal);
-
f(.);
-
}
+
sink(. signal);
+
f(.);
| _ => sink(. signal)
}
-
);
+
)
})
);
-1
src/wonka_operators.test.ts
···
passesSinkClose(noop);
passesSourceEnd(noop);
passesSingleStart(noop);
-
// TODO: passesStrictEnd(noop);
passesAsyncSequence(noop);
it('maps over values given a transform function', () => {
+40 -11
src/wonka_sources.re
···
ended := true;
sink(. Push(x));
sink(. End);
-
| _ => ()
+
| Pull => ()
+
| Close => ended := true
},
),
);
});
+
type makeStateT = {
+
mutable teardown: (. unit) => unit,
+
mutable ended: bool,
+
};
+
[@genType]
let make = (f: (. observerT('a)) => teardownT): sourceT('a) =>
curry(sink => {
-
let teardown = ref((.) => ());
+
let state: makeStateT = {teardown: (.) => (), ended: false};
sink(.
Start(
(. signal) =>
switch (signal) {
-
| Close => teardown^(.)
-
| Pull => ()
+
| Close when !state.ended =>
+
state.ended = true;
+
state.teardown(.);
+
| _ => ()
},
),
);
-
teardown :=
+
state.teardown =
f(. {
-
next: value => sink(. Push(value)),
-
complete: () => sink(. End),
+
next: value =>
+
if (!state.ended) {
+
sink(. Push(value));
+
},
+
complete: () =>
+
if (!state.ended) {
+
state.ended = true;
+
sink(. End);
+
},
});
});
···
sink(.
Start(
(. signal) =>
-
if (signal === Close) {
-
state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
+
switch (signal) {
+
| Close =>
+
state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink)
+
| _ => ()
},
),
);
···
[@genType]
let empty = (sink: sinkT('a)): unit => {
-
sink(. Start(talkbackPlaceholder));
-
sink(. End);
+
let ended = ref(false);
+
sink(.
+
Start(
+
(. signal) => {
+
switch (signal) {
+
| Close => ended := true
+
| _ => ()
+
}
+
},
+
),
+
);
+
if (! ended^) {
+
sink(. End);
+
};
};
[@genType]