Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add combine operator with tests

Changed files
+112
__tests__
src
+42
__tests__/callbag_test.re
···
expect((numsA, nums)) |> toEqual(([| 1, 1 |], [| 1, 1, 2, 2 |]));
});
});
});
describe("sink factories", () => {
···
expect((numsA, nums)) |> toEqual(([| 1, 1 |], [| 1, 1, 2, 2 |]));
});
});
+
+
describe("combine", () => {
+
open Expect;
+
+
it("combines the latest values of two sources", () => {
+
let talkback = ref((_: Callbag_types.talkbackT) => ());
+
+
let makeSource = (factor: int) => {
+
let num = ref(1);
+
+
sink => {
+
sink(Start(signal => {
+
switch (signal) {
+
| Pull => {
+
let i = num^ * factor;
+
num := num^ + 1;
+
sink(Push(i));
+
}
+
| _ => ()
+
}
+
}));
+
}
+
};
+
+
let sourceA = makeSource(1);
+
let sourceB = makeSource(2);
+
let source = Callbag.combine(sourceA, sourceB);
+
let res = [||];
+
+
source(signal => {
+
switch (signal) {
+
| Start(x) => talkback := x
+
| Push(x) => ignore(Js.Array.push(x, res))
+
| _ => ()
+
}
+
});
+
+
talkback^(Pull);
+
talkback^(Pull);
+
expect(res) |> toEqual([| (1, 2), (2, 2), (2, 4) |]);
+
});
+
});
});
describe("sink factories", () => {
+69
src/callbag.re
···
}
};
let forEach = (f, source) =>
captureTalkback(source, [@bs] (signal, talkback) => {
switch (signal) {
···
}
};
+
type combineLatestStateT('a, 'b) = {
+
mutable talkbackA: talkbackT => unit,
+
mutable talkbackB: talkbackT => unit,
+
mutable lastValA: option('a),
+
mutable lastValB: option('b),
+
mutable gotSignal: bool,
+
mutable endCounter: int,
+
mutable ended: bool,
+
};
+
+
let combine = (sourceA, sourceB, sink) => {
+
let state = {
+
talkbackA: (_: talkbackT) => (),
+
talkbackB: (_: talkbackT) => (),
+
lastValA: None,
+
lastValB: None,
+
gotSignal: false,
+
endCounter: 0,
+
ended: false
+
};
+
+
sourceA(signal => {
+
switch (signal, state.lastValB) {
+
| (Start(tb), _) => state.talkbackA = tb
+
| (Push(a), None) => state.lastValA = Some(a)
+
| (Push(a), Some(b)) when !state.ended => {
+
state.lastValA = Some(a);
+
state.gotSignal = false;
+
sink(Push((a, b)));
+
}
+
| (End, _) when state.endCounter < 2 => state.endCounter = state.endCounter + 1
+
| (End, _) => sink(End)
+
| _ => ()
+
}
+
});
+
+
sourceB(signal => {
+
switch (signal, state.lastValA) {
+
| (Start(tb), _) => state.talkbackB = tb
+
| (Push(b), None) => state.lastValB = Some(b)
+
| (Push(b), Some(a)) when !state.ended => {
+
state.lastValB = Some(b);
+
state.gotSignal = false;
+
sink(Push((a, b)));
+
}
+
| (End, _) when state.endCounter < 2 =>
+
state.endCounter = state.endCounter + 1
+
| (End, _) => sink(End)
+
| _ => ()
+
}
+
});
+
+
sink(Start(signal => {
+
switch (signal) {
+
| End => {
+
state.ended = true;
+
state.talkbackA(End);
+
state.talkbackB(End);
+
}
+
| Pull when !state.gotSignal => {
+
state.gotSignal = true;
+
state.talkbackA(signal);
+
state.talkbackB(signal);
+
}
+
| Pull => ()
+
}
+
}));
+
};
+
let forEach = (f, source) =>
captureTalkback(source, [@bs] (signal, talkback) => {
switch (signal) {
+1
src/callbag.rei
···
let scan: (('b, 'a) => 'b, 'b, (signalT('a) => unit) => unit, signalT('b) => unit) => unit;
let merge: (array((signalT('a) => unit) => unit), signalT('a) => unit) => unit;
let share: ((signalT('a) => unit) => unit, signalT('a) => unit) => unit;
let forEach: ('a => unit, (signalT('a) => unit) => unit) => unit;
let subscribe: ('a => unit, (signalT('a) => unit) => unit) => (unit => unit);
···
let scan: (('b, 'a) => 'b, 'b, (signalT('a) => unit) => unit, signalT('b) => unit) => unit;
let merge: (array((signalT('a) => unit) => unit), signalT('a) => unit) => unit;
let share: ((signalT('a) => unit) => unit, signalT('a) => unit) => unit;
+
let combine: ((signalT('a) => unit) => unit, (signalT('b) => unit) => unit, signalT(('a, 'b)) => unit) => unit;
let forEach: ('a => unit, (signalT('a) => unit) => unit) => unit;
let subscribe: ('a => unit, (signalT('a) => unit) => unit) => (unit => unit);