Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add publish consumer

Changed files
+54 -25
__tests__
src
+3 -3
__tests__/wonka_test.re
···
}));
};
-
Wonka.forEach(x => ignore(Js.Array.push(x, nums)), source);
+
Wonka.forEach((.x) => ignore(Js.Array.push(x, nums)), source);
expect(nums) |> toEqual([| 0, 1, 2, 3 |])
});
});
···
sink(.Start(Wonka_helpers.talkbackPlaceholder));
};
-
let { unsubscribe } = Wonka.subscribe(x => ignore(Js.Array.push(x, nums)), source);
+
let { unsubscribe } = Wonka.subscribe((.x) => ignore(Js.Array.push(x, nums)), source);
push^();
push^();
···
input
|> Wonka.fromArray
|> Wonka.map(x => string_of_int(x))
-
|> Wonka.forEach(x => ignore(Js.Array.push(x, actual)));
+
|> Wonka.forEach((.x) => ignore(Js.Array.push(x, actual)));
expect(output) |> toEqual(output)
});
+1
src/wonka.d.ts
···
export const skipWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
export const skipUntil: <A>(signal: Source<any>) => Operator<A, A>;
+
export const publish: <A>(source: Source<A>) => Subscription;
export const forEach: <A>(f: (x: A) => void) => (source: Source<A>) => void;
export const subscribe: <A>(f: (x: A) => void) => (source: Source<A>) => Subscription;
+44 -19
src/wonka.re
···
let tap = f => curry(source => curry(sink => {
source((.signal) => {
switch (signal) {
-
| Push(x) => f(x)
+
| Push(x) => f(.x)
| _ => ()
};
···
}));
}));
-
let forEach = f => curry(source => {
-
captureTalkback(source, (.signal, talkback) => {
+
type publishStateT = {
+
mutable talkback: (.talkbackT) => unit,
+
mutable ended: bool
+
};
+
+
let publish = source => {
+
let state: publishStateT = {
+
talkback: talkbackPlaceholder,
+
ended: false
+
};
+
+
source((.signal) => {
switch (signal) {
-
| Start(_) => talkback(.Pull)
-
| Push(x) => {
-
f(x);
-
talkback(.Pull);
+
| Start(x) => {
+
state.talkback = x;
+
x(.Pull);
}
-
| End => ()
+
| Push(_) => if (!state.ended) state.talkback(.Pull);
+
| End => state.ended = true;
}
});
-
});
+
+
{
+
unsubscribe: () =>
+
if (!state.ended) {
+
state.ended = true;
+
state.talkback(.Close);
+
}
+
}
+
};
let subscribe = f => curry(source => {
-
let talkback = ref(talkbackPlaceholder);
-
let ended = ref(false);
+
let state: publishStateT = {
+
talkback: talkbackPlaceholder,
+
ended: false
+
};
source((.signal) => {
switch (signal) {
| Start(x) => {
-
talkback := x;
+
state.talkback = x;
x(.Pull);
}
-
| Push(x) when !ended^ => {
-
f(x);
-
talkback^(.Pull);
+
| Push(x) when !state.ended => {
+
f(.x);
+
state.talkback(.Pull);
}
-
| _ => ()
+
| Push(_) => ()
+
| End => state.ended = true;
}
});
{
unsubscribe: () =>
-
if (!ended^) {
-
ended := true;
-
talkback^(.Close);
+
if (!state.ended) {
+
state.ended = true;
+
state.talkback(.Close);
}
}
});
+
+
let forEach = f => curry(source => {
+
ignore(subscribe(f, source));
+
});
+6 -3
src/wonka.rei
···
/* Takes a callback and a source, and creates a sink & source.
The callback will be called for each value that it receives */
-
let tap: ('a => unit, sourceT('a), sinkT('a)) => unit;
+
let tap: ((.'a) => unit, sourceT('a), sinkT('a)) => unit;
/* Takes a mapping function from one type to another, and a source,
and creates a sink & source.
···
/* -- sink factories */
+
/* Accepts a source and returns a subscription, but does otherwise not surface values */
+
let publish: sourceT('a) => subscriptionT;
+
/* Takes a function and a source, and creates a sink.
The function will be called for each value that the sink receives.
The sink will attempt to pull new values as values come in, until
the source ends. */
-
let forEach: ('a => unit, sourceT('a)) => unit;
+
let forEach: ((.'a) => unit, sourceT('a)) => unit;
/* Similar to the `forEach` sink factory, but returns an anonymous function
that when called will end the stream immediately.
Ending the stream will propagate an End signal upwards to the root source. */
-
let subscribe: ('a => unit, sourceT('a)) => subscriptionT;
+
let subscribe: ((.'a) => unit, sourceT('a)) => subscriptionT;