Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Fix combine for mixed cold/hot sources (#50)

When two sources are passed two combine of which
one is hot (actively sends Push signals without Pull
talkback signals) and one is cold (only sends Push
signals with Pull talkback signals) the combine
operator stalls.

It will receive a value from the hot source, but
wait for the cold source's value forever.

To prevent this, when no Pull talkback signal has
been received, one is sent to the other (presumably cold)
source proactively.

+10 -2
src/wonka_operators.re
···
| (Start(tb), _) => state.talkbackA = tb
| (Push(a), None) =>
state.lastValA = Some(a);
-
state.gotSignal = false;
+
if (!state.gotSignal) {
+
state.talkbackB(. Pull);
+
} else {
+
state.gotSignal = false;
+
};
| (Push(a), Some(b)) when !state.ended =>
state.lastValA = Some(a);
state.gotSignal = false;
···
| (Start(tb), _) => state.talkbackB = tb
| (Push(b), None) =>
state.lastValB = Some(b);
-
state.gotSignal = false;
+
if (!state.gotSignal) {
+
state.talkbackA(. Pull);
+
} else {
+
state.gotSignal = false;
+
};
| (Push(b), Some(a)) when !state.ended =>
state.lastValB = Some(b);
state.gotSignal = false;
+1 -1
src/wonka_operators.test.ts
···
const noop = (source: types.sourceT<any>) => operators.combine(sources.fromValue(0), source);
passesPassivePull(noop, [0, 0]);
-
// TODO: passesActivePush(noop, [0, 0]);
+
passesActivePush(noop, [0, 0]);
passesSinkClose(noop);
passesSourceEnd(noop, [0, 0]);
passesSingleStart(noop);