Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Uncurry most external API predicates/transform callbacks

+17 -17
__tests__/wonka_test.re
···
let nums = [||];
let talkback = ref((._: Wonka_types.talkbackT) => ());
-
Wonka.map((_) => {
let res = num^;
num := num^ + 1;
res
···
});
testPromise("follows the spec for listenables", () => {
-
Wonka_thelpers.testWithListenable(Wonka.map(x => x))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
-
Wonka_thelpers.testTalkbackEnd(Wonka.map(x => x))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
let nums = [||];
let talkback = ref((._: Wonka_types.talkbackT) => ());
-
Wonka.filter(x => x mod 2 === 0, sink => {
sink(.Start((.signal) => {
switch (signal) {
| Pull => {
···
});
testPromise("follows the spec for listenables", () => {
-
Wonka_thelpers.testWithListenable(Wonka.filter((_) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
});
testPromise("follows the spec for listenables when filtering", () => {
-
Wonka_thelpers.testWithListenable(Wonka.filter((_) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| Pull, Pull |], [| End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
-
Wonka_thelpers.testTalkbackEnd(Wonka.filter((_) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
let talkback = ref((._: Wonka_types.talkbackT) => ());
let num = ref(1);
-
let source = Wonka.takeWhile(x => x <= 2, sink => sink(.Start((.signal) => {
switch (signal) {
| Pull => {
let i = num^;
···
let talkback = ref((._: Wonka_types.talkbackT) => ());
let num = ref(1);
-
let source = Wonka.takeWhile(x => x <= 5, sink => sink(.Start((.signal) => {
switch (signal) {
| Pull => {
let i = num^;
···
});
testPromise("follows the spec for listenables", () => {
-
Wonka_thelpers.testWithListenable(Wonka.takeWhile((_) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
testPromise("follows the spec for listenables when ending the source", () => {
let end_: talkbackT = Close;
-
Wonka_thelpers.testWithListenable(Wonka.takeWhile((_) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
-
Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((_) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
let talkback = ref((._: Wonka_types.talkbackT) => ());
let num = ref(1);
-
let source = Wonka.skipWhile(x => x <= 2, sink => sink(.Start((.signal) => {
switch (signal) {
| Pull when num^ <= 4 => {
let i = num^;
···
});
testPromise("follows the spec for listenables", () => {
-
Wonka_thelpers.testWithListenable(Wonka.skipWhile((_) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
});
testPromise("follows the spec for listenables when skipping the source", () => {
-
Wonka_thelpers.testWithListenable(Wonka.skipWhile((_) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| Pull, Pull |], [| End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
-
Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((_) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
input
|> Wonka.fromArray
-
|> Wonka.map(x => string_of_int(x))
|> Wonka.forEach((.x) => ignore(Js.Array.push(x, actual)));
expect(output) |> toEqual(output)
···
let nums = [||];
let talkback = ref((._: Wonka_types.talkbackT) => ());
+
Wonka.map((._) => {
let res = num^;
num := num^ + 1;
res
···
});
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.map((.x) => x))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
+
Wonka_thelpers.testTalkbackEnd(Wonka.map((.x) => x))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
let nums = [||];
let talkback = ref((._: Wonka_types.talkbackT) => ());
+
Wonka.filter((.x) => x mod 2 === 0, sink => {
sink(.Start((.signal) => {
switch (signal) {
| Pull => {
···
});
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.filter((._) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
});
testPromise("follows the spec for listenables when filtering", () => {
+
Wonka_thelpers.testWithListenable(Wonka.filter((._) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| Pull, Pull |], [| End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
+
Wonka_thelpers.testTalkbackEnd(Wonka.filter((._) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
let talkback = ref((._: Wonka_types.talkbackT) => ());
let num = ref(1);
+
let source = Wonka.takeWhile((.x) => x <= 2, sink => sink(.Start((.signal) => {
switch (signal) {
| Pull => {
let i = num^;
···
let talkback = ref((._: Wonka_types.talkbackT) => ());
let num = ref(1);
+
let source = Wonka.takeWhile((.x) => x <= 5, sink => sink(.Start((.signal) => {
switch (signal) {
| Pull => {
let i = num^;
···
});
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.takeWhile((._) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
testPromise("follows the spec for listenables when ending the source", () => {
let end_: talkbackT = Close;
+
Wonka_thelpers.testWithListenable(Wonka.takeWhile((._) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
+
Wonka_thelpers.testTalkbackEnd(Wonka.takeWhile((._) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
let talkback = ref((._: Wonka_types.talkbackT) => ());
let num = ref(1);
+
let source = Wonka.skipWhile((.x) => x <= 2, sink => sink(.Start((.signal) => {
switch (signal) {
| Pull when num^ <= 4 => {
let i = num^;
···
});
testPromise("follows the spec for listenables", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skipWhile((._) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([||], [| Push(1), Push(2), End |]))
···
});
testPromise("follows the spec for listenables when skipping the source", () => {
+
Wonka_thelpers.testWithListenable(Wonka.skipWhile((._) => true))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| Pull, Pull |], [| End |]))
···
testPromise("ends itself and source when its talkback receives the End signal", () => {
let end_: talkbackT = Close;
+
Wonka_thelpers.testTalkbackEnd(Wonka.skipWhile((._) => false))
|> Js.Promise.then_(x => {
expect(x)
|> toEqual(([| end_ |], [| Push(1) |]))
···
input
|> Wonka.fromArray
+
|> Wonka.map((.x) => string_of_int(x))
|> Wonka.forEach((.x) => ignore(Js.Array.push(x, actual)));
expect(output) |> toEqual(output)
+4 -4
src/web/wonkaJs.re
···
}));
});
-
let debounce = debounceF => curry(source => curry(sink => {
let gotEndSignal = ref(false);
let id: ref(option(Js.Global.timeoutId)) = ref(None);
···
id := None;
sink(.signal);
if (gotEndSignal^) sink(.End);
-
}, debounceF(x)));
}
| End => {
gotEndSignal := true;
···
});
}));
-
let throttle = throttleF => curry(source => curry(sink => {
let skip = ref(false);
let id: ref(option(Js.Global.timeoutId)) = ref(None);
let clearTimeout = () =>
···
id := Some(Js.Global.setTimeout(() => {
id := None;
skip := false;
-
}, throttleF(x)));
sink(.signal);
}
| Push(_) => ()
···
}));
});
+
let debounce = f => curry(source => curry(sink => {
let gotEndSignal = ref(false);
let id: ref(option(Js.Global.timeoutId)) = ref(None);
···
id := None;
sink(.signal);
if (gotEndSignal^) sink(.End);
+
}, f(.x)));
}
| End => {
gotEndSignal := true;
···
});
}));
+
let throttle = f => curry(source => curry(sink => {
let skip = ref(false);
let id: ref(option(Js.Global.timeoutId)) = ref(None);
let clearTimeout = () =>
···
id := Some(Js.Global.setTimeout(() => {
id := None;
skip := false;
+
}, f(.x)));
sink(.signal);
}
| Push(_) => ()
+3 -3
src/web/wonkaJs.rei
···
/* Takes a projection to a period in milliseconds and a source, and creates
a listenable source that emits the last emitted value if no other value
has been emitted during the passed debounce period. */
-
let debounce: ('a => int, sourceT('a), sinkT('a)) => unit;
/* Takes a projection to a period in milliseconds and a source, and creates
a listenable source that ignores values after the last emitted value for
the duration of the returned throttle period. */
-
let throttle: ('a => int, sourceT('a), sinkT('a)) => unit;
/* Takes a notifier source and an input source, and creates a sink & source.
When the notifier emits a value, it will emit the value that it most recently
···
/* Converts a stream into a promise by resolving to the last value of the
stream. */
-
let toPromise: (sourceT('a)) => Js.Promise.t('a);
···
/* Takes a projection to a period in milliseconds and a source, and creates
a listenable source that emits the last emitted value if no other value
has been emitted during the passed debounce period. */
+
let debounce: ((.'a) => int, sourceT('a), sinkT('a)) => unit;
/* Takes a projection to a period in milliseconds and a source, and creates
a listenable source that ignores values after the last emitted value for
the duration of the returned throttle period. */
+
let throttle: ((.'a) => int, sourceT('a), sinkT('a)) => unit;
/* Takes a notifier source and an input source, and creates a sink & source.
When the notifier emits a value, it will emit the value that it most recently
···
/* Converts a stream into a promise by resolving to the last value of the
stream. */
+
let toPromise: sourceT('a) => Js.Promise.t('a);
+16 -16
src/wonka.re
···
};
let make = f => curry(sink => {
-
let teardown = f({
next: value => sink(.Push(value)),
complete: () => sink(.End)
});
sink(.Start((.signal) => {
switch (signal) {
-
| Close => teardown()
| Pull => ()
}
}));
···
source((.signal) => sink(.
switch (signal) {
| Start(x) => Start(x)
-
| Push(x) => Push(f(x))
| End => End
}
));
···
let filter = f => curry(source => curry(sink => {
captureTalkback(source, (.signal, talkback) => {
switch (signal) {
-
| Push(x) when !f(x) => talkback(.Pull)
| _ => sink(.signal)
}
});
···
tb(.Pull);
}
| Push(x) when !state.ended => {
-
applyInnerSource(f(x));
state.outerTalkback(.Pull);
}
| Push(_) => ()
···
}));
}));
-
let merge = sources => mergeMap(identity, fromArray(sources));
-
let mergeAll = source => mergeMap(identity, source);
let flatten = mergeAll;
type concatMapStateT('a) = {
···
state.innerTalkback = talkbackPlaceholder;
switch (Rebel.MutableQueue.pop(state.inputQueue)) {
-
| Some(input) => applyInnerSource(f(input))
| None when state.ended => sink(.End)
| None => ()
};
···
if (state.innerActive) {
Rebel.MutableQueue.add(state.inputQueue, x);
} else {
-
applyInnerSource(f(x));
}
state.outerTalkback(.Pull);
···
}));
}));
-
let concatAll = source => concatMap(identity, source);
-
let concat = sources => concatMap(identity, fromArray(sources));
type switchMapStateT('a) = {
mutable outerTalkback: (.talkbackT) => unit,
···
state.innerTalkback(.Close);
state.innerTalkback = talkbackPlaceholder;
}
-
applyInnerSource(f(x));
state.outerTalkback(.Pull);
}
| Push(_) => ()
···
});
}));
-
let takeWhile = predicate => curry(source => curry(sink => {
let ended = ref(false);
let talkback = ref(talkbackPlaceholder);
···
}
| End => ()
| Push(x) when !ended^ => {
-
if (!predicate(x)) {
ended := true;
sink(.End);
talkback^(.Close);
···
});
}));
-
let skipWhile = predicate => curry(source => curry(sink => {
let skip = ref(true);
captureTalkback(source, (.signal, talkback) => {
switch (signal) {
| Push(x) when skip^ => {
-
if (predicate(x)) {
talkback(.Pull);
} else {
skip := false;
···
};
let make = f => curry(sink => {
+
let teardown = f(.{
next: value => sink(.Push(value)),
complete: () => sink(.End)
});
sink(.Start((.signal) => {
switch (signal) {
+
| Close => teardown(.)
| Pull => ()
}
}));
···
source((.signal) => sink(.
switch (signal) {
| Start(x) => Start(x)
+
| Push(x) => Push(f(.x))
| End => End
}
));
···
let filter = f => curry(source => curry(sink => {
captureTalkback(source, (.signal, talkback) => {
switch (signal) {
+
| Push(x) when !f(.x) => talkback(.Pull)
| _ => sink(.signal)
}
});
···
tb(.Pull);
}
| Push(x) when !state.ended => {
+
applyInnerSource(f(.x));
state.outerTalkback(.Pull);
}
| Push(_) => ()
···
}));
}));
+
let merge = sources => mergeMap((.x) => x, fromArray(sources));
+
let mergeAll = source => mergeMap((.x) => x, source);
let flatten = mergeAll;
type concatMapStateT('a) = {
···
state.innerTalkback = talkbackPlaceholder;
switch (Rebel.MutableQueue.pop(state.inputQueue)) {
+
| Some(input) => applyInnerSource(f(.input))
| None when state.ended => sink(.End)
| None => ()
};
···
if (state.innerActive) {
Rebel.MutableQueue.add(state.inputQueue, x);
} else {
+
applyInnerSource(f(.x));
}
state.outerTalkback(.Pull);
···
}));
}));
+
let concatAll = source => concatMap((.x) => x, source);
+
let concat = sources => concatMap((.x) => x, fromArray(sources));
type switchMapStateT('a) = {
mutable outerTalkback: (.talkbackT) => unit,
···
state.innerTalkback(.Close);
state.innerTalkback = talkbackPlaceholder;
}
+
applyInnerSource(f(.x));
state.outerTalkback(.Pull);
}
| Push(_) => ()
···
});
}));
+
let takeWhile = f => curry(source => curry(sink => {
let ended = ref(false);
let talkback = ref(talkbackPlaceholder);
···
}
| End => ()
| Push(x) when !ended^ => {
+
if (!f(.x)) {
ended := true;
sink(.End);
talkback^(.Close);
···
});
}));
+
let skipWhile = f => curry(source => curry(sink => {
let skip = ref(true);
captureTalkback(source, (.signal, talkback) => {
switch (signal) {
| Push(x) when skip^ => {
+
if (f(.x)) {
talkback(.Pull);
} else {
skip := false;
+8 -8
src/wonka.rei
···
for constructing any kind of asynchronous stream. The return
callback from the passed observer function will be called when
the stream is closed or ends */
-
let make: (observerT('a) => (unit => unit), sinkT('a)) => unit;
/* Accepts a list and creates a pullable source for that list.
The source will emit events when being pulled until the list
···
and creates a sink & source.
All values that it receives will be transformed using the mapping
function and emitted on the new source */
-
let map: ('a => 'b, sourceT('a), sinkT('b)) => unit;
/* Takes a predicate function returning a boolean, and a source,
and creates a sink & source.
All values that it receives will be filtered using the predicate,
and only truthy values will be passed on to the new source.
The sink will attempt to pull a new value when one was filtered. */
-
let filter: ('a => bool, sourceT('a), sinkT('a)) => unit;
/* Takes a reducer function, a seed value, and a source, and creates
a sink & source.
···
and a source, and creates a sink & source.
The mapping function is called with each value it receives and
the resulting inner source is merged into the output source. */
-
let mergeMap: ('a => sourceT('b), sourceT('a), sinkT('b)) => unit;
/* Takes a mapping function from one types to a source output,
and a source, and creates a sink & source.
The mapping function is called with each value it receives and
the latest inner source is merged into the output source. When
a new value comes in the previous source is dicarded. */
-
let switchMap: ('a => sourceT('b), sourceT('a), sinkT('b)) => unit;
/* Takes a mapping function from one types to a source output,
and a source, and creates a sink & source.
The mapping function is called with each value it receives and
the resulting inner sources are subscribed to and piped through
to the output source in order. */
-
let concatMap: ('a => sourceT('b), sourceT('a), sinkT('b)) => unit;
/* Takes an array of sources and creates a sink & source.
All values that the sink receives from all sources will be passed through
···
It will emit values that the sink receives until the predicate returns false
for a value, at which point it will end the source and the returned, new
source. */
-
let takeWhile: ('a => bool, sourceT('a), sinkT('a)) => unit;
/* Takes a notifier source and an input source, and creates a sink & source.
It will not emit values that the sink receives until the notifier source
···
It will not emit values that the sink receives until the passed predicate
returns false for a value, at which point it will start acting like a noop
operator, passing through every signal. */
-
let skipWhile: ('a => bool, sourceT('a), sinkT('a)) => unit;
/* Takes a notifier source and an input source, and creates a sink & source.
It will not emit values that the sink receives until the notifier source
···
for constructing any kind of asynchronous stream. The return
callback from the passed observer function will be called when
the stream is closed or ends */
+
let make: ((.observerT('a)) => teardownT, sinkT('a)) => unit;
/* Accepts a list and creates a pullable source for that list.
The source will emit events when being pulled until the list
···
and creates a sink & source.
All values that it receives will be transformed using the mapping
function and emitted on the new source */
+
let map: ((.'a) => 'b, sourceT('a), sinkT('b)) => unit;
/* Takes a predicate function returning a boolean, and a source,
and creates a sink & source.
All values that it receives will be filtered using the predicate,
and only truthy values will be passed on to the new source.
The sink will attempt to pull a new value when one was filtered. */
+
let filter: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
/* Takes a reducer function, a seed value, and a source, and creates
a sink & source.
···
and a source, and creates a sink & source.
The mapping function is called with each value it receives and
the resulting inner source is merged into the output source. */
+
let mergeMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
/* Takes a mapping function from one types to a source output,
and a source, and creates a sink & source.
The mapping function is called with each value it receives and
the latest inner source is merged into the output source. When
a new value comes in the previous source is dicarded. */
+
let switchMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
/* Takes a mapping function from one types to a source output,
and a source, and creates a sink & source.
The mapping function is called with each value it receives and
the resulting inner sources are subscribed to and piped through
to the output source in order. */
+
let concatMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
/* Takes an array of sources and creates a sink & source.
All values that the sink receives from all sources will be passed through
···
It will emit values that the sink receives until the predicate returns false
for a value, at which point it will end the source and the returned, new
source. */
+
let takeWhile: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
/* Takes a notifier source and an input source, and creates a sink & source.
It will not emit values that the sink receives until the notifier source
···
It will not emit values that the sink receives until the passed predicate
returns false for a value, at which point it will start acting like a noop
operator, passing through every signal. */
+
let skipWhile: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
/* Takes a notifier source and an input source, and creates a sink & source.
It will not emit values that the sink receives until the notifier source
-2
src/wonka_helpers.re
···
open Wonka_types;
-
external identity: 'a => 'a = "%identity";
-
let talkbackPlaceholder = (._: talkbackT) => ();
let captureTalkback = (
···
open Wonka_types;
let talkbackPlaceholder = (._: talkbackT) => ();
let captureTalkback = (
+2
src/wonka_types.re
···
type sinkT('a) = (.signalT('a)) => unit;
type sourceT('a) = sinkT('a) => unit;
type subscriptionT = {
unsubscribe: unit => unit
};
···
type sinkT('a) = (.signalT('a)) => unit;
type sourceT('a) = sinkT('a) => unit;
+
type teardownT = (.unit) => unit;
+
type subscriptionT = {
unsubscribe: unit => unit
};