Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Test for strict close in passesStrictEnd as well (#58)

+22 -20
src/wonka_operators.re
···
innerSource((. signal) =>
switch (signal) {
-
| End =>
-
state.innerTalkbacks =
-
Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
-
if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
-
sink(. End);
-
};
| Start(tb) =>
talkback := tb;
state.innerTalkbacks =
···
sink(. Push(x));
talkback^(. Pull);
| Push(_) => ()
}
);
};
···
sink(.
Start(
(. signal) =>
-
switch (signal) {
-
| Close when !state.ended =>
-
let tbs = state.innerTalkbacks;
-
state.innerTalkbacks = Rebel.Array.makeEmpty();
-
state.outerTalkback(. signal);
-
Rebel.Array.forEach(tbs, tb => tb(. signal));
-
| Close => ()
-
| Pull when !state.ended =>
-
if (!state.outerPulled) {
-
state.outerPulled = true;
-
state.outerTalkback(. Pull);
-
};
-
Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
-
| Pull => ()
},
),
);
···
innerSource((. signal) =>
switch (signal) {
| Start(tb) =>
talkback := tb;
state.innerTalkbacks =
···
sink(. Push(x));
talkback^(. Pull);
| Push(_) => ()
+
| End when Rebel.Array.size(state.innerTalkbacks) !== 0 =>
+
state.innerTalkbacks =
+
Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
+
if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
+
sink(. End);
+
};
+
| End => ()
}
);
};
···
sink(.
Start(
(. signal) =>
+
if (!state.ended) {
+
switch (signal) {
+
| Close =>
+
let tbs = state.innerTalkbacks;
+
state.ended = true;
+
state.innerTalkbacks = Rebel.Array.makeEmpty();
+
state.outerTalkback(. signal);
+
Rebel.Array.forEach(tbs, tb => tb(. signal));
+
| Pull =>
+
if (!state.outerPulled) {
+
state.outerPulled = true;
+
state.outerTalkback(. Pull);
+
};
+
Rebel.Array.forEach(state.innerTalkbacks, tb => tb(. Pull));
+
};
},
),
);
+28 -2
src/wonka_operators.test.ts
···
This isn't a strict requirement, but some operators should ensure that
all sources are well behaved. This is particularly true for operators
that either Close sources themselves or may operate on multiple sources. */
-
const passesStrictEnd = (operator: types.operatorT<any, any>) =>
it('stops all signals after End has been received (spec: strict end)', () => {
let pulls = 0;
const signals = [];
···
expect(signals).toEqual([deriving.end()]);
expect(pulls).toBe(1);
});
/* This tests an immediately closing operator for End signals to
the sink and Close signals to the source.
···
passesSinkClose(noop);
passesSourceEnd(noop);
passesSingleStart(noop);
-
passesStrictEnd(noop);
passesAsyncSequence(noop);
const ending = operators.takeWhile(() => false);
···
This isn't a strict requirement, but some operators should ensure that
all sources are well behaved. This is particularly true for operators
that either Close sources themselves or may operate on multiple sources. */
+
const passesStrictEnd = (operator: types.operatorT<any, any>) => {
it('stops all signals after End has been received (spec: strict end)', () => {
let pulls = 0;
const signals = [];
···
expect(signals).toEqual([deriving.end()]);
expect(pulls).toBe(1);
});
+
+
it('stops all signals after Close has been received (spec: strict close)', () => {
+
const signals = [];
+
+
const source: types.sourceT<any> = sink => {
+
sink(deriving.start(tb => {
+
if (tb === deriving.close) {
+
sink(deriving.push(123));
+
}
+
}));
+
};
+
+
const sink: types.sinkT<any> = signal => {
+
if (deriving.isStart(signal)) {
+
deriving.unboxStart(signal)(deriving.close);
+
} else {
+
signals.push(signal);
+
}
+
};
+
+
operator(source)(sink);
+
+
// The Push signal should've been dropped
+
jest.runAllTimers();
+
expect(signals).toEqual([]);
+
});
+
};
/* This tests an immediately closing operator for End signals to
the sink and Close signals to the source.
···
passesSinkClose(noop);
passesSourceEnd(noop);
passesSingleStart(noop);
passesAsyncSequence(noop);
const ending = operators.takeWhile(() => false);