Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Fix edge cases of buffer operator (#51)

* Fix passesPassivePull for buffer operator

* Fix passesSinkClose and passesStrictEnd for buffer operator

* Fix passesActivePush and passesSourceEnd for buffer operator

+23 -10
src/wonka_operators.re
···
mutable buffer: Rebel.MutableQueue.t('a),
mutable sourceTalkback: (. talkbackT) => unit,
mutable notifierTalkback: (. talkbackT) => unit,
+
mutable pulled: bool,
mutable ended: bool,
};
···
buffer: Rebel.MutableQueue.make(),
sourceTalkback: talkbackPlaceholder,
notifierTalkback: talkbackPlaceholder,
+
pulled: false,
ended: false,
};
···
notifier((. signal) =>
switch (signal) {
-
| Start(tb) =>
-
state.notifierTalkback = tb;
-
state.notifierTalkback(. Pull);
+
| Start(tb) => state.notifierTalkback = tb
| Push(_) when !state.ended =>
-
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
-
state.buffer = Rebel.MutableQueue.make();
-
state.notifierTalkback(. Pull);
+
if (Rebel.MutableQueue.size(state.buffer) > 0) {
+
let buffer = state.buffer;
+
state.buffer = Rebel.MutableQueue.make();
+
sink(. Push(Rebel.MutableQueue.toArray(buffer)));
+
}
| Push(_) => ()
| End when !state.ended =>
state.ended = true;
state.sourceTalkback(. Close);
-
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
if (Rebel.MutableQueue.size(state.buffer) > 0) {
+
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
};
sink(. End);
| End => ()
}
);
| Push(value) when !state.ended =>
Rebel.MutableQueue.add(state.buffer, value);
-
state.sourceTalkback(. Pull);
+
state.pulled = false;
+
state.notifierTalkback(. Pull);
| Push(_) => ()
| End when !state.ended =>
state.ended = true;
state.notifierTalkback(. Close);
-
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
if (Rebel.MutableQueue.size(state.buffer) > 0) {
+
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
};
sink(. End);
| End => ()
}
···
state.ended = true;
state.sourceTalkback(. Close);
state.notifierTalkback(. Close);
-
| Pull => state.sourceTalkback(. Pull)
+
| Pull =>
+
if (!state.pulled) {
+
state.pulled = true;
+
state.sourceTalkback(. Pull);
+
state.notifierTalkback(. Pull);
+
}
+
| Pull => ()
};
},
),
+20 -12
src/wonka_operators.test.ts
···
) =>
it('responds to Pull talkback signals (spec)', () => {
let talkback = null;
+
let push = 0;
const values = [];
const source: types.sourceT<any> = sink => {
sink(deriving.start(tb => {
-
if (tb === deriving.pull)
+
if (!push && tb === deriving.pull) {
+
push++;
sink(deriving.push(0));
+
}
}));
};
···
});
describe('buffer', () => {
-
const noop = operators.buffer(
-
operators.merge([
-
sources.fromValue(null),
-
sources.never
-
])
-
);
+
const valueThenNever: types.sourceT<any> = sink =>
+
sink(deriving.start(tb => {
+
if (tb === deriving.pull)
+
sink(deriving.push(null));
+
}));
-
// TODO: passesPassivePull(noop, [0]);
-
// TODO: passesActivePush(noop);
-
// TODO: passesSinkClose(noop);
-
// TODO: passesSourceEnd(noop);
+
const noop = operators.buffer(valueThenNever);
+
+
passesPassivePull(noop, [0]);
+
passesActivePush(noop, [0]);
+
passesSinkClose(noop);
+
passesSourceEnd(noop, [0]);
passesSingleStart(noop);
-
// TODO: passesStrictEnd(noop);
+
passesStrictEnd(noop);
it('emits batches of input values when a notifier emits', () => {
const { source: notifier$, next: notify } = sources.makeSubject();
···
notify(null);
expect(fn).toHaveBeenCalledWith([1, 2]);
+
+
next(3);
+
notify(null);
+
expect(fn).toHaveBeenCalledWith([3]);
});
});