Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add buffer operator (#33)

* Add initial buffer implementation

* Add tests for buffer operator

+58
__tests__/wonka_test.re
···
)
);
});
});
describe("sink factories", () => {
···
)
);
});
+
+
describe("buffer", () => {
+
open Expect;
+
open! Expect.Operators;
+
+
beforeEach(() => Jest.useFakeTimers());
+
afterEach(() => Jest.useRealTimers());
+
+
it("should buffer values and emit them on each notifier tick", () => {
+
let a = Wonka.interval(50);
+
+
let talkback = ref((. _: Wonka_types.talkbackT) => ());
+
let signals = [||];
+
+
let source = Wonka.buffer(Wonka.interval(100), a) |> Wonka.take(2);
+
+
source((. signal) =>
+
switch (signal) {
+
| Start(x) =>
+
talkback := x;
+
x(. Pull);
+
| Push(_) =>
+
ignore(Js.Array.push(signal, signals));
+
talkback^(. Pull);
+
| End => ignore(Js.Array.push(signal, signals))
+
}
+
);
+
+
Jest.runTimersToTime(400);
+
+
expect(signals) == [|Push([|0, 1|]), Push([|2, 3|]), End|];
+
});
+
+
it("should end when the notifier ends", () => {
+
let a = Wonka.interval(50) |> Wonka.take(3);
+
+
let talkback = ref((. _: Wonka_types.talkbackT) => ());
+
let signals = [||];
+
+
let source = Wonka.buffer(Wonka.interval(100), a);
+
+
source((. signal) =>
+
switch (signal) {
+
| Start(x) =>
+
talkback := x;
+
x(. Pull);
+
| Push(_) =>
+
ignore(Js.Array.push(signal, signals));
+
talkback^(. Pull);
+
| End => ignore(Js.Array.push(signal, signals))
+
}
+
);
+
+
Jest.runTimersToTime(400);
+
+
expect(signals) == [|Push([|0, 1|]), Push([|2|]), End|];
+
});
+
});
});
describe("sink factories", () => {
+3
src/operators/wonka_operator_buffer.d.ts
···
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const buffer: <A>(signal: Source<any>) => Operator<A, A[]>;
+72
src/operators/wonka_operator_buffer.re
···
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type bufferStateT('a) = {
+
mutable buffer: Rebel.MutableQueue.t('a),
+
mutable sourceTalkback: (. talkbackT) => unit,
+
mutable notifierTalkback: (. talkbackT) => unit,
+
mutable ended: bool,
+
};
+
+
let buffer = (notifier: sourceT('a)) =>
+
curry((source: sourceT('b)) =>
+
curry((sink: sinkT(array('b))) => {
+
let state: bufferStateT('b) = {
+
buffer: Rebel.MutableQueue.make(),
+
sourceTalkback: talkbackPlaceholder,
+
notifierTalkback: talkbackPlaceholder,
+
ended: false,
+
};
+
+
source((. signal) =>
+
switch (signal) {
+
| Start(tb) =>
+
state.sourceTalkback = tb;
+
+
notifier((. signal) =>
+
switch (signal) {
+
| Start(tb) =>
+
state.notifierTalkback = tb;
+
state.notifierTalkback(. Pull);
+
| Push(_) when !state.ended =>
+
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
state.buffer = Rebel.MutableQueue.make();
+
state.notifierTalkback(. Pull);
+
| Push(_) => ()
+
| End when !state.ended =>
+
state.ended = true;
+
state.sourceTalkback(. Close);
+
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
sink(. End);
+
| End => ()
+
}
+
);
+
| Push(value) when !state.ended =>
+
Rebel.MutableQueue.add(state.buffer, value);
+
state.sourceTalkback(. Pull);
+
| Push(_) => ()
+
| End when !state.ended =>
+
state.ended = true;
+
state.notifierTalkback(. Close);
+
sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
+
sink(. End);
+
| End => ()
+
}
+
);
+
+
sink(.
+
Start(
+
(. signal) =>
+
if (!state.ended) {
+
switch (signal) {
+
| Close =>
+
state.ended = true;
+
state.sourceTalkback(. Close);
+
state.notifierTalkback(. Close);
+
| Pull => state.sourceTalkback(. Pull)
+
};
+
},
+
),
+
);
+
})
+
);
+3
src/operators/wonka_operator_buffer.rei
···
···
+
open Wonka_types;
+
+
let buffer: (sourceT('a), sourceT('b), sinkT(array('b))) => unit;
+2 -2
src/operators/wonka_operator_takeUntil.re
···
innerTb(. Pull);
| Push(_) =>
state.ended = true;
-
state.notifierTalkback(. Close);
state.sourceTalkback(. Close);
sink(. End);
| End => ()
}
);
| End when !state.ended =>
-
state.notifierTalkback(. Close);
state.ended = true;
sink(. End);
| End => ()
| Push(_) when !state.ended => sink(. signal)
···
if (!state.ended) {
switch (signal) {
| Close =>
state.sourceTalkback(. Close);
state.notifierTalkback(. Close);
| Pull => state.sourceTalkback(. Pull)
···
innerTb(. Pull);
| Push(_) =>
state.ended = true;
state.sourceTalkback(. Close);
sink(. End);
| End => ()
}
);
| End when !state.ended =>
state.ended = true;
+
state.notifierTalkback(. Close);
sink(. End);
| End => ()
| Push(_) when !state.ended => sink(. signal)
···
if (!state.ended) {
switch (signal) {
| Close =>
+
state.ended = true;
state.sourceTalkback(. Close);
state.notifierTalkback(. Close);
| Pull => state.sourceTalkback(. Pull)
+1
src/wonka.d.ts
···
export * from './sources/wonka_source_primitives';
/* operators */
export * from './operators/wonka_operator_combine';
export * from './operators/wonka_operator_concatMap';
export * from './operators/wonka_operator_filter';
···
export * from './sources/wonka_source_primitives';
/* operators */
+
export * from './operators/wonka_operator_buffer';
export * from './operators/wonka_operator_combine';
export * from './operators/wonka_operator_concatMap';
export * from './operators/wonka_operator_filter';
+1
src/wonka_operators.re
···
include Wonka_operator_combine;
include Wonka_operator_concatMap;
include Wonka_operator_filter;
···
+
include Wonka_operator_buffer;
include Wonka_operator_combine;
include Wonka_operator_concatMap;
include Wonka_operator_filter;