Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add strict ending to onEnd and onPush (#61)

Since these are side-effects that may be used
to reinsert more signals into the source chain,
through subjects for instance, they need to
protect themselves from dangerous signal loops.

+44 -18
src/wonka_operators.re
···
let onEnd = (f: (. unit) => unit): operatorT('a, 'a) =>
curry(source =>
curry(sink => {
+
let ended = ref(false);
source((. signal) =>
switch (signal) {
| Start(talkback) =>
sink(.
Start(
-
(. signal) => {
-
switch (signal) {
-
| Close => f(.)
-
| _ => ()
-
};
-
talkback(. signal);
-
},
+
(. signal) =>
+
if (! ended^) {
+
switch (signal) {
+
| Pull => talkback(. signal)
+
| Close =>
+
ended := true;
+
talkback(. signal);
+
f(.);
+
};
+
},
),
)
-
| End =>
+
| Push(_) when ! ended^ => sink(. signal)
+
| Push(_) => ()
+
| End when ! ended^ =>
+
ended := true;
sink(. signal);
f(.);
-
| _ => sink(. signal)
+
| End => ()
}
-
)
+
);
})
);
[@genType]
let onPush = (f: (. 'a) => unit): operatorT('a, 'a) =>
curry(source =>
-
curry(sink =>
+
curry(sink => {
+
let ended = ref(false);
source((. signal) => {
switch (signal) {
-
| Push(x) => f(. x)
-
| _ => ()
-
};
-
-
sink(. signal);
-
})
-
)
+
| Start(talkback) =>
+
sink(.
+
Start(
+
(. signal) =>
+
if (! ended^) {
+
switch (signal) {
+
| Pull => talkback(. signal)
+
| Close =>
+
ended := true;
+
talkback(. signal);
+
};
+
},
+
),
+
)
+
| Push(x) when ! ended^ =>
+
f(. x);
+
sink(. signal);
+
| Push(_) => ()
+
| End when ! ended^ =>
+
ended := true;
+
sink(. signal);
+
| End => ()
+
}
+
});
+
})
);
[@genType]
+2
src/wonka_operators.test.ts
···
passesActivePush(noop);
passesSinkClose(noop);
passesSourceEnd(noop);
+
passesStrictEnd(noop);
passesSingleStart(noop);
passesAsyncSequence(noop);
···
passesActivePush(noop);
passesSinkClose(noop);
passesSourceEnd(noop);
+
passesStrictEnd(noop);
passesSingleStart(noop);
passesAsyncSequence(noop);