Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Restructure all the things

Changed files
+1572 -1488
src
operators
sinks
sources
web
+15 -4
bsconfig.json
···
"sources": [
{
"dir": "src",
-
"subdirs": [{
-
"dir": "web",
-
"backend": ["js"]
-
}]
+
"subdirs": [
+
{
+
"dir": "operators"
+
},
+
{
+
"dir": "sources"
+
},
+
{
+
"dir": "sinks"
+
},
+
{
+
"dir": "web",
+
"backend": ["js"]
+
}
+
]
},
{
"dir": "__tests__",
+1 -2
src/index.d.ts
···
-
export { Talkback, Signal, Sink, Subscription, Source, Operator, Observer, Subject } from './wonka_types';
-
export * from './pipe';
+
export * from './wonka_types';
export * from './wonka';
export * from './web/wonkaJs';
+3
src/operators/wonka_operator_combine.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const combine: <A, B>(a: Source<A>, b: Source<B>) => Source<[A, B]>;
+86
src/operators/wonka_operator_combine.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type combineStateT('a, 'b) = {
+
mutable talkbackA: (.talkbackT) => unit,
+
mutable talkbackB: (.talkbackT) => unit,
+
mutable lastValA: option('a),
+
mutable lastValB: option('b),
+
mutable gotSignal: bool,
+
mutable endCounter: int,
+
mutable ended: bool,
+
};
+
+
let combine = (sourceA, sourceB) => curry(sink => {
+
let state = {
+
talkbackA: talkbackPlaceholder,
+
talkbackB: talkbackPlaceholder,
+
lastValA: None,
+
lastValB: None,
+
gotSignal: false,
+
endCounter: 0,
+
ended: false
+
};
+
+
sourceA((.signal) => {
+
switch (signal, state.lastValB) {
+
| (Start(tb), _) => state.talkbackA = tb
+
| (Push(a), None) => {
+
state.lastValA = Some(a);
+
state.gotSignal = false;
+
}
+
| (Push(a), Some(b)) when !state.ended => {
+
state.lastValA = Some(a);
+
state.gotSignal = false;
+
sink(.Push((a, b)));
+
}
+
| (End, _) when state.endCounter < 1 =>
+
state.endCounter = state.endCounter + 1
+
| (End, _) when !state.ended => {
+
state.ended = true;
+
sink(.End);
+
}
+
| _ => ()
+
}
+
});
+
+
sourceB((.signal) => {
+
switch (signal, state.lastValA) {
+
| (Start(tb), _) => state.talkbackB = tb
+
| (Push(b), None) => {
+
state.lastValB = Some(b);
+
state.gotSignal = false;
+
}
+
| (Push(b), Some(a)) when !state.ended => {
+
state.lastValB = Some(b);
+
state.gotSignal = false;
+
sink(.Push((a, b)));
+
}
+
| (End, _) when state.endCounter < 1 =>
+
state.endCounter = state.endCounter + 1
+
| (End, _) when !state.ended => {
+
state.ended = true;
+
sink(.End);
+
}
+
| _ => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
if (!state.ended) {
+
switch (signal) {
+
| Close => {
+
state.ended = true;
+
state.talkbackA(.Close);
+
state.talkbackB(.Close);
+
}
+
| Pull when !state.gotSignal => {
+
state.gotSignal = true;
+
state.talkbackA(.signal);
+
state.talkbackB(.signal);
+
}
+
| Pull => ()
+
}
+
};
+
}));
+
});
+3
src/operators/wonka_operator_combine.rei
···
+
open Wonka_types;
+
+
let combine: (sourceT('a), sourceT('b), sinkT(('a, 'b))) => unit;
+5
src/operators/wonka_operator_concatMap.d.ts
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const concatMap: <A, B>(f: (value: A) => Source<B>) => Operator<A, B>;
+
export const concat: <A>(sources: Array<Source<A>>) => Source<A>;
+
export const concatAll: <A>(source: Source<Source<A>>) => Source<A>;
+94
src/operators/wonka_operator_concatMap.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type concatMapStateT('a) = {
+
inputQueue: Rebel.MutableQueue.t('a),
+
mutable outerTalkback: (.talkbackT) => unit,
+
mutable innerTalkback: (.talkbackT) => unit,
+
mutable innerActive: bool,
+
mutable closed: bool,
+
mutable ended: bool
+
};
+
+
let concatMap = f => curry(source => curry(sink => {
+
let state: concatMapStateT('a) = {
+
inputQueue: Rebel.MutableQueue.make(),
+
outerTalkback: talkbackPlaceholder,
+
innerTalkback: talkbackPlaceholder,
+
innerActive: false,
+
closed: false,
+
ended: false
+
};
+
+
let rec applyInnerSource = innerSource =>
+
innerSource((.signal) => {
+
switch (signal) {
+
| End => {
+
state.innerActive = false;
+
state.innerTalkback = talkbackPlaceholder;
+
+
switch (Rebel.MutableQueue.pop(state.inputQueue)) {
+
| Some(input) => applyInnerSource(f(.input))
+
| None when state.ended => sink(.End)
+
| None => ()
+
};
+
}
+
| Start(tb) => {
+
state.innerActive = true;
+
state.innerTalkback = tb;
+
tb(.Pull);
+
}
+
| Push(x) when !state.closed => {
+
sink(.Push(x));
+
state.innerTalkback(.Pull);
+
}
+
| Push(_) => ()
+
}
+
});
+
+
source((.signal) => {
+
switch (signal) {
+
| End when !state.ended => {
+
state.ended = true;
+
if (!state.innerActive && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
+
sink(.End);
+
}
+
}
+
| End => ()
+
| Start(tb) => {
+
state.outerTalkback = tb;
+
tb(.Pull);
+
}
+
| Push(x) when !state.ended => {
+
if (state.innerActive) {
+
Rebel.MutableQueue.add(state.inputQueue, x);
+
} else {
+
applyInnerSource(f(.x));
+
}
+
+
state.outerTalkback(.Pull);
+
}
+
| Push(_) => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Pull => if (!state.ended) state.innerTalkback(.Pull)
+
| Close when !state.ended => {
+
state.ended = true;
+
state.closed = true;
+
state.outerTalkback(.Close);
+
state.innerTalkback(.Close);
+
}
+
| Close => ()
+
}
+
}));
+
}));
+
+
let concatAll = source => concatMap((.x) => x, source);
+
+
let concat = sources => {
+
open Wonka_source_fromArray;
+
concatMap((.x) => x, fromArray(sources));
+
};
+5
src/operators/wonka_operator_concatMap.rei
···
+
open Wonka_types;
+
+
let concatMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
+
let concat: (array(sourceT('a)), sinkT('a)) => unit;
+
let concatAll: (sourceT(sourceT('a)), sinkT('a)) => unit;
+3
src/operators/wonka_operator_filter.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const filter: <A>(f: (value: A) => boolean) => Operator<A, A>;
+11
src/operators/wonka_operator_filter.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let filter = f => curry(source => curry(sink => {
+
captureTalkback(source, (.signal, talkback) => {
+
switch (signal) {
+
| Push(x) when !f(.x) => talkback(.Pull)
+
| _ => sink(.signal)
+
}
+
});
+
}));
+3
src/operators/wonka_operator_filter.rei
···
+
open Wonka_types;
+
+
let filter: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_map.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const map: <A, B>(f: (value: A) => B) => Operator<A, B>;
+11
src/operators/wonka_operator_map.re
···
+
open Wonka_types;
+
+
let map = f => curry(source => curry(sink => {
+
source((.signal) => sink(.
+
switch (signal) {
+
| Start(x) => Start(x)
+
| Push(x) => Push(f(.x))
+
| End => End
+
}
+
));
+
}));
+3
src/operators/wonka_operator_map.rei
···
+
open Wonka_types;
+
+
let map: ((.'a) => 'b, sourceT('a), sinkT('b)) => unit;
+6
src/operators/wonka_operator_mergeMap.d.ts
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const mergeMap: <A, B>(f: (value: A) => Source<B>) => Operator<A, B>;
+
export const merge: <A>(sources: Array<Source<A>>) => Source<A>;
+
export const mergeAll: <A>(source: Source<Source<A>>) => Source<A>;
+
export const flatten: <A>(source: Source<Source<A>>) => Source<A>;
+85
src/operators/wonka_operator_mergeMap.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type mergeMapStateT = {
+
mutable outerTalkback: (.talkbackT) => unit,
+
mutable innerTalkbacks: Rebel.Array.t((.talkbackT) => unit),
+
mutable ended: bool
+
};
+
+
let mergeMap = f => curry(source => curry(sink => {
+
let state: mergeMapStateT = {
+
outerTalkback: talkbackPlaceholder,
+
innerTalkbacks: Rebel.Array.makeEmpty(),
+
ended: false
+
};
+
+
let applyInnerSource = innerSource => {
+
let talkback = ref(talkbackPlaceholder);
+
+
innerSource((.signal) => {
+
switch (signal) {
+
| End => {
+
state.innerTalkbacks = Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
+
if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
+
sink(.End);
+
}
+
}
+
| Start(tb) => {
+
talkback := tb;
+
state.innerTalkbacks = Rebel.Array.append(state.innerTalkbacks, tb);
+
tb(.Pull);
+
}
+
| Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => {
+
sink(.Push(x));
+
talkback^(.Pull);
+
}
+
| Push(_) => ()
+
}
+
});
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| End when !state.ended => {
+
state.ended = true;
+
if (Rebel.Array.size(state.innerTalkbacks) === 0) {
+
sink(.End);
+
}
+
}
+
| End => ()
+
| Start(tb) => {
+
state.outerTalkback = tb;
+
tb(.Pull);
+
}
+
| Push(x) when !state.ended => {
+
applyInnerSource(f(.x));
+
state.outerTalkback(.Pull);
+
}
+
| Push(_) => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close when !state.ended => {
+
state.ended = true;
+
state.outerTalkback(.Close);
+
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close));
+
state.innerTalkbacks = Rebel.Array.makeEmpty();
+
}
+
| Close => ()
+
| Pull when !state.ended =>
+
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Pull));
+
| Pull => ()
+
}
+
}));
+
}));
+
+
let merge = sources => {
+
open Wonka_source_fromArray;
+
mergeMap((.x) => x, fromArray(sources));
+
};
+
+
let mergeAll = source => mergeMap((.x) => x, source);
+
let flatten = mergeAll;
+6
src/operators/wonka_operator_mergeMap.rei
···
+
open Wonka_types;
+
+
let mergeMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
+
let merge: (array(sourceT('a)), sinkT('a)) => unit;
+
let mergeAll: (sourceT(sourceT('a)), sinkT('a)) => unit;
+
let flatten: (sourceT(sourceT('a)), sinkT('a)) => unit;
+3
src/operators/wonka_operator_scan.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const scan: <A, B>(f: (acc: B, value: A) => B, acc: B) => Operator<A, B>;
+16
src/operators/wonka_operator_scan.re
···
+
open Wonka_types;
+
+
let scan = (f, seed) => curry(source => curry(sink => {
+
let acc = ref(seed);
+
+
source((.signal) => sink(.
+
switch (signal) {
+
| Push(x) => {
+
acc := f(acc^, x);
+
Push(acc^)
+
}
+
| Start(x) => Start(x)
+
| End => End
+
}
+
));
+
}));
+3
src/operators/wonka_operator_scan.rei
···
+
open Wonka_types;
+
+
let scan: (('b, 'a) => 'b, 'b, sourceT('a), sinkT('b)) => unit;
+3
src/operators/wonka_operator_share.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const share: <A>(source: Source<A>) => Source<A>;
+52
src/operators/wonka_operator_share.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type shareStateT('a) = {
+
mutable sinks: Rebel.Array.t(sinkT('a)),
+
mutable talkback: (.talkbackT) => unit,
+
mutable gotSignal: bool
+
};
+
+
let share = source => {
+
let state = {
+
sinks: Rebel.Array.makeEmpty(),
+
talkback: talkbackPlaceholder,
+
gotSignal: false
+
};
+
+
sink => {
+
state.sinks = Rebel.Array.append(state.sinks, sink);
+
+
if (Rebel.Array.size(state.sinks) === 1) {
+
source((.signal) => {
+
switch (signal) {
+
| Push(_) => {
+
state.gotSignal = false;
+
Rebel.Array.forEach(state.sinks, sink => sink(.signal));
+
}
+
| Start(x) => state.talkback = x
+
| End => {
+
Rebel.Array.forEach(state.sinks, sink => sink(.End));
+
state.sinks = Rebel.Array.makeEmpty();
+
}
+
}
+
});
+
};
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => {
+
state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
+
if (Rebel.Array.size(state.sinks) === 0) {
+
state.talkback(.Close);
+
};
+
}
+
| Pull when !state.gotSignal => {
+
state.gotSignal = true;
+
state.talkback(.signal);
+
}
+
| Pull => ()
+
}
+
}));
+
}
+
};
+3
src/operators/wonka_operator_share.rei
···
+
open Wonka_types;
+
+
let share: (sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_skip.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const skip: <A>(max: number) => Operator<A, A>;
+16
src/operators/wonka_operator_skip.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let skip = wait => curry(source => curry(sink => {
+
let rest = ref(wait);
+
+
captureTalkback(source, (.signal, talkback) => {
+
switch (signal) {
+
| Push(_) when rest^ > 0 => {
+
rest := rest^ - 1;
+
talkback(.Pull);
+
}
+
| _ => sink(.signal)
+
}
+
});
+
}));
+3
src/operators/wonka_operator_skip.rei
···
+
open Wonka_types;
+
+
let skip: (int, sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_skipUntil.d.ts
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const skipUntil: <A>(signal: Source<any>) => Operator<A, A>;
+69
src/operators/wonka_operator_skipUntil.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type skipUntilStateT = {
+
mutable skip: bool,
+
mutable ended: bool,
+
mutable gotSignal: bool,
+
mutable sourceTalkback: (.talkbackT) => unit,
+
mutable notifierTalkback: (.talkbackT) => unit
+
};
+
+
let skipUntil = notifier => curry(source => curry(sink => {
+
let state: skipUntilStateT = {
+
skip: true,
+
ended: false,
+
gotSignal: false,
+
sourceTalkback: talkbackPlaceholder,
+
notifierTalkback: talkbackPlaceholder
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => {
+
state.sourceTalkback = tb;
+
+
notifier((.signal) => {
+
switch (signal) {
+
| Start(innerTb) => {
+
state.notifierTalkback = innerTb;
+
innerTb(.Pull);
+
tb(.Pull);
+
}
+
| Push(_) => {
+
state.skip = false;
+
state.notifierTalkback(.Close);
+
}
+
| End => ()
+
}
+
});
+
}
+
| Push(_) when state.skip && !state.ended => state.sourceTalkback(.Pull)
+
| Push(_) when !state.ended => {
+
state.gotSignal = false;
+
sink(.signal)
+
}
+
| Push(_) => ()
+
| End => {
+
if (state.skip) state.notifierTalkback(.Close);
+
state.ended = true;
+
sink(.End)
+
}
+
}
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => {
+
if (state.skip) state.notifierTalkback(.Close);
+
state.ended = true;
+
state.sourceTalkback(.Close);
+
}
+
| Pull when !state.gotSignal && !state.ended => {
+
state.gotSignal = true;
+
state.sourceTalkback(.Pull);
+
}
+
| Pull => ()
+
}
+
}));
+
}));
+3
src/operators/wonka_operator_skipUntil.rei
···
+
open Wonka_types;
+
+
let skipUntil: (sourceT('a), sourceT('b), sinkT('b)) => unit;
+3
src/operators/wonka_operator_skipWhile.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const skipWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
+20
src/operators/wonka_operator_skipWhile.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let skipWhile = f => curry(source => curry(sink => {
+
let skip = ref(true);
+
+
captureTalkback(source, (.signal, talkback) => {
+
switch (signal) {
+
| Push(x) when skip^ => {
+
if (f(.x)) {
+
talkback(.Pull);
+
} else {
+
skip := false;
+
sink(.signal);
+
};
+
}
+
| _ => sink(.signal)
+
}
+
});
+
}));
+3
src/operators/wonka_operator_skipWhile.rei
···
+
open Wonka_types;
+
+
let skipWhile: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_switchMap.d.ts
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const switchMap: <A, B>(f: (value: A) => Source<B>) => Operator<A, B>;
+78
src/operators/wonka_operator_switchMap.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type switchMapStateT('a) = {
+
mutable outerTalkback: (.talkbackT) => unit,
+
mutable innerTalkback: (.talkbackT) => unit,
+
mutable innerActive: bool,
+
mutable closed: bool,
+
mutable ended: bool
+
};
+
+
let switchMap = f => curry(source => curry(sink => {
+
let state: switchMapStateT('a) = {
+
outerTalkback: talkbackPlaceholder,
+
innerTalkback: talkbackPlaceholder,
+
innerActive: false,
+
closed: false,
+
ended: false
+
};
+
+
let applyInnerSource = innerSource =>
+
innerSource((.signal) => {
+
switch (signal) {
+
| End => {
+
state.innerActive = false;
+
state.innerTalkback = talkbackPlaceholder;
+
if (state.ended) sink(.End);
+
}
+
| Start(tb) => {
+
state.innerActive = true;
+
state.innerTalkback = tb;
+
tb(.Pull);
+
}
+
| Push(x) when !state.closed => {
+
sink(.Push(x));
+
state.innerTalkback(.Pull);
+
}
+
| Push(_) => ()
+
}
+
});
+
+
source((.signal) => {
+
switch (signal) {
+
| End when !state.ended => {
+
state.ended = true;
+
if (!state.innerActive) sink(.End);
+
}
+
| End => ()
+
| Start(tb) => {
+
state.outerTalkback = tb;
+
tb(.Pull);
+
}
+
| Push(x) when !state.ended => {
+
if (state.innerActive) {
+
state.innerTalkback(.Close);
+
state.innerTalkback = talkbackPlaceholder;
+
}
+
applyInnerSource(f(.x));
+
state.outerTalkback(.Pull);
+
}
+
| Push(_) => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Pull => state.innerTalkback(.Pull)
+
| Close when !state.ended => {
+
state.ended = true;
+
state.closed = true;
+
state.outerTalkback(.Close);
+
state.innerTalkback(.Close);
+
state.innerTalkback = talkbackPlaceholder;
+
}
+
| Close => ()
+
}
+
}));
+
}));
+3
src/operators/wonka_operator_switchMap.rei
···
+
open Wonka_types;
+
+
let switchMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
+3
src/operators/wonka_operator_take.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const take: <A>(max: number) => Operator<A, A>;
+47
src/operators/wonka_operator_take.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type takeStateT = {
+
mutable taken: int,
+
mutable talkback: (.talkbackT) => unit
+
};
+
+
let take = max => curry(source => curry(sink => {
+
let state: takeStateT = {
+
taken: 0,
+
talkback: talkbackPlaceholder
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => state.talkback = tb;
+
| Push(_) when state.taken < max => {
+
state.taken = state.taken + 1;
+
sink(.signal);
+
+
if (state.taken === max) {
+
sink(.End);
+
state.talkback(.Close);
+
};
+
}
+
| Push(_) => ()
+
| End when state.taken < max => {
+
state.taken = max;
+
sink(.End)
+
}
+
| End => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
if (state.taken < max) {
+
switch (signal) {
+
| Pull => state.talkback(.Pull);
+
| Close => {
+
state.taken = max;
+
state.talkback(.Close);
+
}
+
}
+
};
+
}));
+
}));
+3
src/operators/wonka_operator_take.rei
···
+
open Wonka_types;
+
+
let take: (int, sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_takeLast.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const takeLast: <A>(max: number) => Operator<A, A>;
+23
src/operators/wonka_operator_takeLast.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let takeLast = max => curry(source => curry(sink => {
+
open Rebel;
+
let queue = MutableQueue.make();
+
+
captureTalkback(source, (.signal, talkback) => {
+
switch (signal) {
+
| Start(_) => talkback(.Pull)
+
| Push(x) => {
+
let size = MutableQueue.size(queue);
+
if (size >= max && max > 0) {
+
ignore(MutableQueue.pop(queue));
+
};
+
+
MutableQueue.add(queue, x);
+
talkback(.Pull);
+
}
+
| End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
+
}
+
});
+
}));
+3
src/operators/wonka_operator_takeLast.rei
···
+
open Wonka_types;
+
+
let takeLast: (int, sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_takeUntil.d.ts
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const takeUntil: <A>(signal: Source<any>) => Operator<A, A>;
+60
src/operators/wonka_operator_takeUntil.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type takeUntilStateT = {
+
mutable ended: bool,
+
mutable sourceTalkback: (.talkbackT) => unit,
+
mutable notifierTalkback: (.talkbackT) => unit
+
};
+
+
let takeUntil = notifier => curry(source => curry(sink => {
+
let state: takeUntilStateT = {
+
ended: false,
+
sourceTalkback: talkbackPlaceholder,
+
notifierTalkback: talkbackPlaceholder
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => {
+
state.sourceTalkback = tb;
+
+
notifier((.signal) => {
+
switch (signal) {
+
| Start(innerTb) => {
+
state.notifierTalkback = innerTb;
+
innerTb(.Pull);
+
}
+
| Push(_) => {
+
state.ended = true;
+
state.notifierTalkback(.Close);
+
state.sourceTalkback(.Close);
+
sink(.End);
+
}
+
| End => ()
+
}
+
});
+
}
+
| End when !state.ended => {
+
state.notifierTalkback(.Close);
+
state.ended = true;
+
sink(.End);
+
}
+
| End => ()
+
| Push(_) when !state.ended => sink(.signal)
+
| Push(_) => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
if (!state.ended) {
+
switch (signal) {
+
| Close => {
+
state.sourceTalkback(.Close);
+
state.notifierTalkback(.Close);
+
}
+
| Pull => state.sourceTalkback(.Pull)
+
}
+
};
+
}));
+
}));
+3
src/operators/wonka_operator_takeUntil.rei
···
+
open Wonka_types;
+
+
let takeUntil: (sourceT('a), sourceT('b), sinkT('b)) => unit;
+3
src/operators/wonka_operator_takeWhile.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const takeWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
+43
src/operators/wonka_operator_takeWhile.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let takeWhile = f => curry(source => curry(sink => {
+
let ended = ref(false);
+
let talkback = ref(talkbackPlaceholder);
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => {
+
talkback := tb;
+
sink(.signal);
+
}
+
| End when !ended^ => {
+
ended := true;
+
sink(.End);
+
}
+
| End => ()
+
| Push(x) when !ended^ => {
+
if (!f(.x)) {
+
ended := true;
+
sink(.End);
+
talkback^(.Close);
+
} else {
+
sink(.signal);
+
};
+
}
+
| Push(_) => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
if (!ended^) {
+
switch (signal) {
+
| Pull => talkback^(.Pull);
+
| Close => {
+
ended := true;
+
talkback^(.Close);
+
}
+
}
+
};
+
}));
+
}));
+3
src/operators/wonka_operator_takeWhile.rei
···
+
open Wonka_types;
+
+
let takeWhile: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
+3
src/operators/wonka_operator_tap.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const tap: <A>(f: (value: A) => void) => Operator<A, A>;
+12
src/operators/wonka_operator_tap.re
···
+
open Wonka_types;
+
+
let tap = f => curry(source => curry(sink => {
+
source((.signal) => {
+
switch (signal) {
+
| Push(x) => f(.x)
+
| _ => ()
+
};
+
+
sink(.signal);
+
});
+
}));
+3
src/operators/wonka_operator_tap.rei
···
+
open Wonka_types;
+
+
let tap: ((.'a) => unit, sourceT('a), sinkT('a)) => unit;
+3
src/sinks/wonka_sink_publish.d.ts
···
+
import { Source, Subscription } from '../wonka_types';
+
+
export const publish: <A>(source: Source<A>) => Subscription;
+33
src/sinks/wonka_sink_publish.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type publishStateT = {
+
mutable talkback: (.talkbackT) => unit,
+
mutable ended: bool
+
};
+
+
let publish = source => {
+
let state: publishStateT = {
+
talkback: talkbackPlaceholder,
+
ended: false
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(x) => {
+
state.talkback = x;
+
x(.Pull);
+
}
+
| Push(_) => if (!state.ended) state.talkback(.Pull);
+
| End => state.ended = true;
+
}
+
});
+
+
{
+
unsubscribe: () =>
+
if (!state.ended) {
+
state.ended = true;
+
state.talkback(.Close);
+
}
+
}
+
};
+3
src/sinks/wonka_sink_publish.rei
···
+
open Wonka_types;
+
+
let publish: sourceT('a) => subscriptionT;
+4
src/sinks/wonka_sink_subscribe.d.ts
···
+
import { Source, Subscription } from '../wonka_types';
+
+
export const subscribe: <A>(f: (x: A) => void) => (source: Source<A>) => Subscription;
+
export const forEach: <A>(f: (x: A) => void) => (source: Source<A>) => void;
+41
src/sinks/wonka_sink_subscribe.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
type subscribeStateT = {
+
mutable talkback: (.talkbackT) => unit,
+
mutable ended: bool
+
};
+
+
let subscribe = f => curry(source => {
+
let state: subscribeStateT = {
+
talkback: talkbackPlaceholder,
+
ended: false
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(x) => {
+
state.talkback = x;
+
x(.Pull);
+
}
+
| Push(x) when !state.ended => {
+
f(.x);
+
state.talkback(.Pull);
+
}
+
| Push(_) => ()
+
| End => state.ended = true;
+
}
+
});
+
+
{
+
unsubscribe: () =>
+
if (!state.ended) {
+
state.ended = true;
+
state.talkback(.Close);
+
}
+
}
+
});
+
+
let forEach = f => curry(source => {
+
ignore(subscribe(f, source));
+
});
+4
src/sinks/wonka_sink_subscribe.rei
···
+
open Wonka_types;
+
+
let subscribe: ((.'a) => unit, sourceT('a)) => subscriptionT;
+
let forEach: ((.'a) => unit, sourceT('a)) => unit;
+3
src/sources/wonka_source_fromArray.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const fromArray: <A>(array: A[]) => Source<A>;
+44
src/sources/wonka_source_fromArray.re
···
+
open Wonka_types;
+
+
type fromArrayState('a) = {
+
mutable index: int,
+
mutable ended: bool,
+
mutable looping: bool,
+
mutable pull: bool
+
};
+
+
let fromArray = arr => curry(sink => {
+
let size = Rebel.Array.size(arr);
+
let state = {
+
index: 0,
+
ended: false,
+
looping: false,
+
pull: false
+
};
+
+
sink(.Start((.signal) => {
+
switch (signal, state.looping) {
+
| (Pull, false) => {
+
state.pull = true;
+
state.looping = true;
+
+
while (state.pull && !state.ended) {
+
let index = state.index;
+
if (index < size) {
+
let x = Rebel.Array.getUnsafe(arr, index);
+
state.index = index + 1;
+
state.pull = false;
+
sink(.Push(x));
+
} else {
+
state.ended = true;
+
sink(.End);
+
}
+
};
+
+
state.looping = false;
+
}
+
| (Pull, true) => state.pull = true
+
| (Close, _) => state.ended = true
+
}
+
}));
+
});
+3
src/sources/wonka_source_fromArray.rei
···
+
open Wonka_types;
+
+
let fromArray: (array('a), sinkT('a)) => unit;
+3
src/sources/wonka_source_fromList.d.ts
···
+
import { List, Source } from '../wonka_types';
+
+
export const fromList: <A>(list: List<A>) => Source<A>;
+44
src/sources/wonka_source_fromList.re
···
+
open Wonka_types;
+
+
type fromListState('a) = {
+
mutable value: 'a,
+
mutable ended: bool,
+
mutable looping: bool,
+
mutable pull: bool
+
};
+
+
let fromList = ls => curry(sink => {
+
let state = {
+
value: ls,
+
ended: false,
+
looping: false,
+
pull: false
+
};
+
+
sink(.Start((.signal) => {
+
switch (signal, state.looping) {
+
| (Pull, false) => {
+
state.pull = true;
+
state.looping = true;
+
+
while (state.pull && !state.ended) {
+
switch (state.value) {
+
| [x, ...rest] => {
+
state.value = rest;
+
state.pull = false;
+
sink(.Push(x));
+
}
+
| [] => {
+
state.ended = true;
+
sink(.End);
+
}
+
}
+
};
+
+
state.looping = false;
+
}
+
| (Pull, true) => state.pull = true
+
| (Close, _) => state.ended = true
+
}
+
}));
+
});
+3
src/sources/wonka_source_fromList.rei
···
+
open Wonka_types;
+
+
let fromList: (list('a), sinkT('a)) => unit;
+3
src/sources/wonka_source_fromValue.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const fromValue: <A>(value: A) => Source<A>;
+16
src/sources/wonka_source_fromValue.re
···
+
open Wonka_types;
+
+
let fromValue = x => curry(sink => {
+
let ended = ref(false);
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Pull when !ended^ => {
+
ended := true;
+
sink(.Push(x));
+
sink(.End);
+
}
+
| _ => ()
+
}
+
}));
+
});
+3
src/sources/wonka_source_fromValue.rei
···
+
open Wonka_types;
+
+
let fromValue: ('a, sinkT('a)) => unit;
+3
src/sources/wonka_source_make.d.ts
···
+
import { Source, Observer } from '../wonka_types';
+
+
export const make: <A>(f: (observer: Observer<A>) => (() => void)) => Source<A>;
+15
src/sources/wonka_source_make.re
···
+
open Wonka_types;
+
+
let make = f => curry(sink => {
+
let teardown = f(.{
+
next: value => sink(.Push(value)),
+
complete: () => sink(.End)
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => teardown(.)
+
| Pull => ()
+
}
+
}));
+
});
+3
src/sources/wonka_source_make.rei
···
+
open Wonka_types;
+
+
let make: ((.observerT('a)) => teardownT, sinkT('a)) => unit;
+3
src/sources/wonka_source_makeSubject.d.ts
···
+
import { Subject } from '../wonka_types';
+
+
export const makeSubject: <A>() => Subject<A>;
+35
src/sources/wonka_source_makeSubject.re
···
+
open Wonka_types;
+
+
type subjectState('a) = {
+
mutable sinks: Rebel.Array.t(sinkT('a)),
+
mutable ended: bool
+
};
+
+
let makeSubject = () => {
+
let state: subjectState('a) = {
+
sinks: Rebel.Array.makeEmpty(),
+
ended: false,
+
};
+
+
let source = sink => {
+
state.sinks = Rebel.Array.append(state.sinks, sink);
+
sink(.Start((.signal) => {
+
if (signal === Close) {
+
state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
+
}
+
}));
+
};
+
+
let next = value =>
+
if (!state.ended) {
+
Rebel.Array.forEach(state.sinks, sink => sink(.Push(value)));
+
};
+
+
let complete = () =>
+
if (!state.ended) {
+
state.ended = true;
+
Rebel.Array.forEach(state.sinks, sink => sink(.End));
+
};
+
+
{ source, next, complete }
+
};
+3
src/sources/wonka_source_makeSubject.rei
···
+
open Wonka_types;
+
+
let makeSubject: unit => subjectT('a);
+4
src/sources/wonka_source_primitives.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const empty: Source<{}>;
+
export const never: Source<{}>;
+11
src/sources/wonka_source_primitives.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let empty = sink => {
+
sink(.Start(talkbackPlaceholder));
+
sink(.End);
+
};
+
+
let never = sink => {
+
sink(.Start(talkbackPlaceholder));
+
};
+4
src/sources/wonka_source_primitives.rei
···
+
open Wonka_types;
+
+
let empty: (sinkT('a)) => unit;
+
let never: (sinkT('a)) => unit;
+12 -15
src/web/wonkaJs.d.ts
···
-
import { Sink, Source, Operator } from '../wonka_types';
+
/* operators */
+
export * from './wonka_operator_debounce';
+
export * from './wonka_operator_delay';
+
export * from './wonka_operator_interval';
+
export * from './wonka_operator_sample';
+
export * from './wonka_operator_throttle';
-
export const fromListener: <E>(
-
addListener: (cb: (event: E) => void) => void,
-
removeListener: (cb: (event: E) => void) => void
-
) => Source<E>;
+
/* sinks */
+
export * from './wonka_sink_toPromise';
-
export const fromDomEvent: <E>(HTMLElement, string) => Source<E>;
-
export const interval: (interval: number) => Source<number>;
-
export const fromPromise: <A>(promise: Promise<A>) => Source<A>;
-
-
export const debounce: <A>(f: (x: A) => number) => Operator<A, A>;
-
export const throttle: <A>(f: (x: A) => number) => Operator<A, A>;
-
export const sample: <A>(signal: Source<any>) => Operator<A, A>;
-
export const delay: <A>(duration: number) => Operator<A, A>;
-
-
export const toPromise: <A>(source: Source<A>) => Promise<A>;
+
/* sources */
+
export * from './wonka_source_fromDomEvent';
+
export * from './wonka_source_fromListener';
+
export * from './wonka_source_fromPromise';
+12 -275
src/web/wonkaJs.re
···
-
open Wonka_types;
-
-
let fromListener = (addListener, removeListener) => curry(sink => {
-
let handler = event => sink(.Push(event));
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => removeListener(handler)
-
| _ => ()
-
}
-
}));
-
-
addListener(handler);
-
});
-
-
let fromDomEvent = (element, event) => curry(sink => {
-
let addEventListener: (
-
Dom.element,
-
string,
-
(Dom.event) => unit
-
) => unit = [%raw {|
-
function (element, event, handler) {
-
element.addEventListener(event, handler);
-
}
-
|}];
-
-
let removeEventListener: (
-
Dom.element,
-
string,
-
(Dom.event) => unit
-
) => unit = [%raw {|
-
function (element, event, handler) {
-
element.removeEventListener(event, handler);
-
}
-
|}];
-
-
fromListener(
-
handler => addEventListener(element, event, handler),
-
handler => removeEventListener(element, event, handler),
-
sink
-
)
-
});
-
-
let interval = p => curry(sink => {
-
let i = ref(0);
-
let id = Js.Global.setInterval(() => {
-
let num = i^;
-
i := i^ + 1;
-
sink(.Push(num));
-
}, p);
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => Js.Global.clearInterval(id)
-
| _ => ()
-
}
-
}));
-
});
-
-
let fromPromise = promise => curry(sink => {
-
let ended = ref(false);
-
-
ignore(Js.Promise.then_(value => {
-
if (!ended^) {
-
sink(.Push(value));
-
sink(.End);
-
};
-
-
Js.Promise.resolve(())
-
}, promise));
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => ended := true
-
| _ => ()
-
}
-
}));
-
});
-
-
let debounce = f => curry(source => curry(sink => {
-
let gotEndSignal = ref(false);
-
let id: ref(option(Js.Global.timeoutId)) = ref(None);
-
-
let clearTimeout = () =>
-
switch (id^) {
-
| Some(timeoutId) => {
-
id := None;
-
Js.Global.clearTimeout(timeoutId);
-
}
-
| None => ()
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => {
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => {
-
clearTimeout();
-
tb(.Close);
-
}
-
| _ => tb(.signal)
-
}
-
}));
-
}
-
| Push(x) => {
-
clearTimeout();
-
id := Some(Js.Global.setTimeout(() => {
-
id := None;
-
sink(.signal);
-
if (gotEndSignal^) sink(.End);
-
}, f(.x)));
-
}
-
| End => {
-
gotEndSignal := true;
-
-
switch (id^) {
-
| None => sink(.End)
-
| _ => ()
-
};
-
}
-
}
-
});
-
}));
+
/* operators */
+
include Wonka_operator_debounce;
+
include Wonka_operator_delay;
+
include Wonka_operator_interval;
+
include Wonka_operator_sample;
+
include Wonka_operator_throttle;
-
let throttle = f => curry(source => curry(sink => {
-
let skip = ref(false);
-
let id: ref(option(Js.Global.timeoutId)) = ref(None);
-
let clearTimeout = () =>
-
switch (id^) {
-
| Some(timeoutId) => Js.Global.clearTimeout(timeoutId);
-
| None => ()
-
};
+
/* sinks */
+
include Wonka_sink_toPromise;
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => {
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => {
-
clearTimeout();
-
tb(.Close);
-
}
-
| _ => tb(.signal)
-
}
-
}));
-
}
-
| End => {
-
clearTimeout();
-
sink(.End);
-
}
-
| Push(x) when !skip^ => {
-
skip := true;
-
clearTimeout();
-
id := Some(Js.Global.setTimeout(() => {
-
id := None;
-
skip := false;
-
}, f(.x)));
-
sink(.signal);
-
}
-
| Push(_) => ()
-
}
-
});
-
}));
-
-
type sampleStateT('a) = {
-
mutable ended: bool,
-
mutable value: option('a),
-
mutable sourceTalkback: (.talkbackT) => unit,
-
mutable notifierTalkback: (.talkbackT) => unit
-
};
-
-
let sample = notifier => curry(source => curry(sink => {
-
let state = {
-
ended: false,
-
value: None,
-
sourceTalkback: (._: talkbackT) => (),
-
notifierTalkback: (._: talkbackT) => ()
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => state.sourceTalkback = tb
-
| End => {
-
state.ended = true;
-
state.notifierTalkback(.Close);
-
sink(.End);
-
}
-
| Push(x) => state.value = Some(x)
-
}
-
});
-
-
notifier((.signal) => {
-
switch (signal, state.value) {
-
| (Start(tb), _) => state.notifierTalkback = tb
-
| (End, _) => {
-
state.ended = true;
-
state.sourceTalkback(.Close);
-
sink(.End);
-
}
-
| (Push(_), Some(x)) when !state.ended => {
-
state.value = None;
-
sink(.Push(x));
-
}
-
| (Push(_), _) => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Pull => {
-
state.sourceTalkback(.Pull);
-
state.notifierTalkback(.Pull);
-
}
-
| Close => {
-
state.ended = true;
-
state.sourceTalkback(.Close);
-
state.notifierTalkback(.Close);
-
}
-
}
-
}));
-
}));
-
-
type delayStateT = {
-
mutable talkback: (.talkbackT) => unit,
-
mutable active: int,
-
mutable gotEndSignal: bool
-
};
-
-
let delay = wait => curry(source => curry(sink => {
-
let state: delayStateT = {
-
talkback: Wonka_helpers.talkbackPlaceholder,
-
active: 0,
-
gotEndSignal: false
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => state.talkback = tb
-
| _ when !state.gotEndSignal => {
-
state.active = state.active + 1;
-
ignore(Js.Global.setTimeout(() => {
-
if (state.gotEndSignal && state.active === 0) {
-
sink(.End);
-
} else {
-
state.active = state.active - 1;
-
};
-
-
sink(.signal);
-
}, wait));
-
}
-
| _ => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => {
-
state.gotEndSignal = true;
-
if (state.active === 0) sink(.End);
-
}
-
| _ when !state.gotEndSignal => state.talkback(.signal)
-
| _ => ()
-
}
-
}));
-
}));
-
-
let toPromise = source =>
-
Js.Promise.make((~resolve, ~reject as _) => {
-
Wonka.takeLast(1, source, (.signal) => {
-
switch (signal) {
-
| Start(x) => x(.Pull)
-
| Push(x) => resolve(.x)
-
| End => ()
-
}
-
});
-
});
+
/* sources */
+
include Wonka_source_fromDomEvent;
+
include Wonka_source_fromListener;
+
include Wonka_source_fromPromise;
-66
src/web/wonkaJs.rei
···
-
open Wonka_types;
-
-
/* -- source factories */
-
-
/* Accepts an event listening start function and stop function
-
and creates a listenable source that emits the received events.
-
This stream will emit values indefinitely until it receives an
-
End signal from a talkback passed downwards to its sink, which
-
calls the stop function using the internal handler.
-
This works well for Dom event listeners, for example the ones
-
in bs-webapi-incubator:
-
https://github.com/reasonml-community/bs-webapi-incubator/blob/master/src/dom/events/EventTargetRe.re
-
*/
-
let fromListener:
-
(
-
('event => unit) => unit,
-
('event => unit) => unit,
-
sinkT('event)
-
) =>
-
unit;
-
-
/* Accepts a Dom.element type and an event nme and creates a listenable
-
source that emits values of the Dom.event type. This stream is
-
created using the fromListener helper and more specific events
-
should be created using the methods in bs-webapi-incubator:
-
https://github.com/reasonml-community/bs-webapi-incubator/blob/master/src/dom/events/EventTargetRe.re
-
*/
-
let fromDomEvent: (Dom.element, string, sinkT(Dom.event)) => unit;
-
-
/* Accepts a period in milliseconds and creates a listenable source
-
that emits ascending numbers for each time the interval fires.
-
This stream will emit values indefinitely until it receives an
-
End signal from a talkback passed downwards to its sink. */
-
let interval: (int, sinkT(int)) => unit;
-
-
/* Accepts a JS promise and creates a listenable source that emits
-
the promise's value once it resolves.
-
This stream will wait for the promise's completion, unless it
-
receives an End signal first. */
-
let fromPromise: (Js.Promise.t('a), sinkT('a)) => unit;
-
-
/* -- operators */
-
-
/* Takes a projection to a period in milliseconds and a source, and creates
-
a listenable source that emits the last emitted value if no other value
-
has been emitted during the passed debounce period. */
-
let debounce: ((.'a) => int, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a projection to a period in milliseconds and a source, and creates
-
a listenable source that ignores values after the last emitted value for
-
the duration of the returned throttle period. */
-
let throttle: ((.'a) => int, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a notifier source and an input source, and creates a sink & source.
-
When the notifier emits a value, it will emit the value that it most recently
-
received from the input source, unless said source hasn't emitted anything
-
since the last signal. */
-
let sample: (sourceT('a), sourceT('b), sinkT('b)) => unit;
-
-
/* Takes a projection to a period in milliseconds and a source, and creates
-
a listenable source that delays every emission by that passed period. */
-
let delay: (int, sourceT('a), sinkT('a)) => unit;
-
-
/* Converts a stream into a promise by resolving to the last value of the
-
stream. */
-
let toPromise: sourceT('a) => Js.Promise.t('a);
+3
src/web/wonka_operator_debounce.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const debounce: <A>(f: (x: A) => number) => Operator<A, A>;
+47
src/web/wonka_operator_debounce.re
···
+
open Wonka_types;
+
+
let debounce = f => curry(source => curry(sink => {
+
let gotEndSignal = ref(false);
+
let id: ref(option(Js.Global.timeoutId)) = ref(None);
+
+
let clearTimeout = () =>
+
switch (id^) {
+
| Some(timeoutId) => {
+
id := None;
+
Js.Global.clearTimeout(timeoutId);
+
}
+
| None => ()
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => {
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => {
+
clearTimeout();
+
tb(.Close);
+
}
+
| _ => tb(.signal)
+
}
+
}));
+
}
+
| Push(x) => {
+
clearTimeout();
+
id := Some(Js.Global.setTimeout(() => {
+
id := None;
+
sink(.signal);
+
if (gotEndSignal^) sink(.End);
+
}, f(.x)));
+
}
+
| End => {
+
gotEndSignal := true;
+
+
switch (id^) {
+
| None => sink(.End)
+
| _ => ()
+
};
+
}
+
}
+
});
+
}));
+3
src/web/wonka_operator_debounce.rei
···
+
open Wonka_types;
+
+
let debounce: ((.'a) => int, sourceT('a), sinkT('a)) => unit;
+3
src/web/wonka_operator_delay.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const delay: <A>(duration: number) => Operator<A, A>;
+45
src/web/wonka_operator_delay.re
···
+
open Wonka_types;
+
+
type delayStateT = {
+
mutable talkback: (.talkbackT) => unit,
+
mutable active: int,
+
mutable gotEndSignal: bool
+
};
+
+
let delay = wait => curry(source => curry(sink => {
+
let state: delayStateT = {
+
talkback: Wonka_helpers.talkbackPlaceholder,
+
active: 0,
+
gotEndSignal: false
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => state.talkback = tb
+
| _ when !state.gotEndSignal => {
+
state.active = state.active + 1;
+
ignore(Js.Global.setTimeout(() => {
+
if (state.gotEndSignal && state.active === 0) {
+
sink(.End);
+
} else {
+
state.active = state.active - 1;
+
};
+
+
sink(.signal);
+
}, wait));
+
}
+
| _ => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => {
+
state.gotEndSignal = true;
+
if (state.active === 0) sink(.End);
+
}
+
| _ when !state.gotEndSignal => state.talkback(.signal)
+
| _ => ()
+
}
+
}));
+
}));
+3
src/web/wonka_operator_delay.rei
···
+
open Wonka_types;
+
+
let delay: (int, sourceT('a), sinkT('a)) => unit;
+3
src/web/wonka_operator_interval.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const interval: (interval: number) => Source<number>;
+17
src/web/wonka_operator_interval.re
···
+
open Wonka_types;
+
+
let interval = p => curry(sink => {
+
let i = ref(0);
+
let id = Js.Global.setInterval(() => {
+
let num = i^;
+
i := i^ + 1;
+
sink(.Push(num));
+
}, p);
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => Js.Global.clearInterval(id)
+
| _ => ()
+
}
+
}));
+
});
+3
src/web/wonka_operator_interval.rei
···
+
open Wonka_types;
+
+
let interval: (int, sinkT(int)) => unit;
+3
src/web/wonka_operator_sample.d.ts
···
+
import { Source, Operator } from '../wonka_types';
+
+
export const sample: <A>(signal: Source<any>) => Operator<A, A>;
+59
src/web/wonka_operator_sample.re
···
+
open Wonka_types;
+
+
type sampleStateT('a) = {
+
mutable ended: bool,
+
mutable value: option('a),
+
mutable sourceTalkback: (.talkbackT) => unit,
+
mutable notifierTalkback: (.talkbackT) => unit
+
};
+
+
let sample = notifier => curry(source => curry(sink => {
+
let state = {
+
ended: false,
+
value: None,
+
sourceTalkback: (._: talkbackT) => (),
+
notifierTalkback: (._: talkbackT) => ()
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => state.sourceTalkback = tb
+
| End => {
+
state.ended = true;
+
state.notifierTalkback(.Close);
+
sink(.End);
+
}
+
| Push(x) => state.value = Some(x)
+
}
+
});
+
+
notifier((.signal) => {
+
switch (signal, state.value) {
+
| (Start(tb), _) => state.notifierTalkback = tb
+
| (End, _) => {
+
state.ended = true;
+
state.sourceTalkback(.Close);
+
sink(.End);
+
}
+
| (Push(_), Some(x)) when !state.ended => {
+
state.value = None;
+
sink(.Push(x));
+
}
+
| (Push(_), _) => ()
+
}
+
});
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Pull => {
+
state.sourceTalkback(.Pull);
+
state.notifierTalkback(.Pull);
+
}
+
| Close => {
+
state.ended = true;
+
state.sourceTalkback(.Close);
+
state.notifierTalkback(.Close);
+
}
+
}
+
}));
+
}));
+3
src/web/wonka_operator_sample.rei
···
+
open Wonka_types;
+
+
let sample: (sourceT('a), sourceT('b), sinkT('b)) => unit;
+3
src/web/wonka_operator_throttle.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const throttle: <A>(f: (x: A) => number) => Operator<A, A>;
+41
src/web/wonka_operator_throttle.re
···
+
open Wonka_types;
+
+
let throttle = f => curry(source => curry(sink => {
+
let skip = ref(false);
+
let id: ref(option(Js.Global.timeoutId)) = ref(None);
+
let clearTimeout = () =>
+
switch (id^) {
+
| Some(timeoutId) => Js.Global.clearTimeout(timeoutId);
+
| None => ()
+
};
+
+
source((.signal) => {
+
switch (signal) {
+
| Start(tb) => {
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => {
+
clearTimeout();
+
tb(.Close);
+
}
+
| _ => tb(.signal)
+
}
+
}));
+
}
+
| End => {
+
clearTimeout();
+
sink(.End);
+
}
+
| Push(x) when !skip^ => {
+
skip := true;
+
clearTimeout();
+
id := Some(Js.Global.setTimeout(() => {
+
id := None;
+
skip := false;
+
}, f(.x)));
+
sink(.signal);
+
}
+
| Push(_) => ()
+
}
+
});
+
}));
+3
src/web/wonka_operator_throttle.rei
···
+
open Wonka_types;
+
+
let throttle: ((.'a) => int, sourceT('a), sinkT('a)) => unit;
+3
src/web/wonka_sink_toPromise.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const toPromise: <A>(source: Source<A>) => Promise<A>;
+12
src/web/wonka_sink_toPromise.re
···
+
open Wonka_types;
+
+
let toPromise = source =>
+
Js.Promise.make((~resolve, ~reject as _) => {
+
Wonka.takeLast(1, source, (.signal) => {
+
switch (signal) {
+
| Start(x) => x(.Pull)
+
| Push(x) => resolve(.x)
+
| End => ()
+
}
+
});
+
});
+3
src/web/wonka_sink_toPromise.rei
···
+
open Wonka_types;
+
+
let toPromise: sourceT('a) => Js.Promise.t('a);
+3
src/web/wonka_source_fromDomEvent.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const fromDomEvent: <E>(HTMLElement, string) => Source<E>;
+30
src/web/wonka_source_fromDomEvent.re
···
+
open Wonka_types;
+
open Wonka_source_fromListener;
+
+
let fromDomEvent = (element, event) => curry(sink => {
+
let addEventListener: (
+
Dom.element,
+
string,
+
(Dom.event) => unit
+
) => unit = [%raw {|
+
function (element, event, handler) {
+
element.addEventListener(event, handler);
+
}
+
|}];
+
+
let removeEventListener: (
+
Dom.element,
+
string,
+
(Dom.event) => unit
+
) => unit = [%raw {|
+
function (element, event, handler) {
+
element.removeEventListener(event, handler);
+
}
+
|}];
+
+
fromListener(
+
handler => addEventListener(element, event, handler),
+
handler => removeEventListener(element, event, handler),
+
sink
+
)
+
});
+3
src/web/wonka_source_fromDomEvent.rei
···
+
open Wonka_types;
+
+
let fromDomEvent: (Dom.element, string, sinkT(Dom.event)) => unit;
+6
src/web/wonka_source_fromListener.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export const fromListener: <E>(
+
addListener: (cb: (event: E) => void) => void,
+
removeListener: (cb: (event: E) => void) => void
+
) => Source<E>;
+14
src/web/wonka_source_fromListener.re
···
+
open Wonka_types;
+
+
let fromListener = (addListener, removeListener) => curry(sink => {
+
let handler = event => sink(.Push(event));
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => removeListener(handler)
+
| _ => ()
+
}
+
}));
+
+
addListener(handler);
+
});
+8
src/web/wonka_source_fromListener.rei
···
+
open Wonka_types;
+
+
let fromListener:
+
(
+
('event => unit) => unit,
+
('event => unit) => unit,
+
sinkT('event)
+
) => unit;
+3
src/web/wonka_source_fromPromise.d.ts
···
+
import { Operator } from '../wonka_types';
+
+
export const fromPromise: <A>(promise: Promise<A>) => Source<A>;
+21
src/web/wonka_source_fromPromise.re
···
+
open Wonka_types;
+
+
let fromPromise = promise => curry(sink => {
+
let ended = ref(false);
+
+
ignore(Js.Promise.then_(value => {
+
if (!ended^) {
+
sink(.Push(value));
+
sink(.End);
+
};
+
+
Js.Promise.resolve(())
+
}, promise));
+
+
sink(.Start((.signal) => {
+
switch (signal) {
+
| Close => ended := true
+
| _ => ()
+
}
+
}));
+
});
+3
src/web/wonka_source_fromPromise.rei
···
+
open Wonka_types;
+
+
let fromPromise: (Js.Promise.t('a), sinkT('a)) => unit;
+27 -38
src/wonka.d.ts
···
-
import { List, Sink, Source, Subscription, Operator, Observer, Subject } from './wonka_types';
+
/* sources */
+
export * from './sources/wonka_source_fromArray';
+
export * from './sources/wonka_source_fromList';
+
export * from './sources/wonka_source_fromValue';
+
export * from './sources/wonka_source_make';
+
export * from './sources/wonka_source_makeSubject';
+
export * from './sources/wonka_source_primitives';
-
export const makeSubject: <A>() => Subject<A>;
+
/* operators */
+
export * from './operators/wonka_operator_combine';
+
export * from './operators/wonka_operator_concatMap';
+
export * from './operators/wonka_operator_filter';
+
export * from './operators/wonka_operator_map';
+
export * from './operators/wonka_operator_mergeMap';
+
export * from './operators/wonka_operator_scan';
+
export * from './operators/wonka_operator_share';
+
export * from './operators/wonka_operator_skip';
+
export * from './operators/wonka_operator_skipUntil';
+
export * from './operators/wonka_operator_skipWhile';
+
export * from './operators/wonka_operator_switchMap';
+
export * from './operators/wonka_operator_take';
+
export * from './operators/wonka_operator_takeLast';
+
export * from './operators/wonka_operator_takeUntil';
+
export * from './operators/wonka_operator_takeWhile';
+
export * from './operators/wonka_operator_tap';
-
export const make: <A>(f: (observer: Observer<A>) => (() => void)) => Source<A>;
-
export const fromList: <A>(list: List<A>) => Source<A>;
-
export const fromArray: <A>(array: A[]) => Source<A>;
-
export const fromValue: <A>(value: A) => Source<A>;
-
export const empty: Source<{}>;
-
export const never: Source<{}>;
-
-
export const tap: <A>(f: (value: A) => void) => Operator<A, A>;
-
export const map: <A, B>(f: (value: A) => B) => Operator<A, B>;
-
export const filter: <A>(f: (value: A) => boolean) => Operator<A, A>;
-
export const scan: <A, B>(f: (acc: B, value: A) => B, acc: B) => Operator<A, B>;
-
-
export const mergeMap: <A, B>(f: (value: A) => Source<B>) => Operator<A, B>;
-
export const switchMap: <A, B>(f: (value: A) => Source<B>) => Operator<A, B>;
-
export const concatMap: <A, B>(f: (value: A) => Source<B>) => Operator<A, B>;
-
-
export const merge: <A>(sources: Array<Source<A>>) => Source<A>;
-
export const concat: <A>(sources: Array<Source<A>>) => Source<A>;
-
export const share: <A>(source: Source<A>) => Source<A>;
-
export const combine: <A, B>(a: Source<A>, b: Source<B>) => Source<[A, B]>;
-
-
export const concatAll: <A>(source: Source<Source<A>>) => Source<A>;
-
export const mergeAll: <A>(source: Source<Source<A>>) => Source<A>;
-
export const flatten: <A>(source: Source<Source<A>>) => Source<A>;
-
-
export const take: <A>(max: number) => Operator<A, A>;
-
export const takeLast: <A>(max: number) => Operator<A, A>;
-
export const takeWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
-
export const takeUntil: <A>(signal: Source<any>) => Operator<A, A>;
-
export const skip: <A>(max: number) => Operator<A, A>;
-
export const skipWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
-
export const skipUntil: <A>(signal: Source<any>) => Operator<A, A>;
-
-
export const publish: <A>(source: Source<A>) => Subscription;
-
export const forEach: <A>(f: (x: A) => void) => (source: Source<A>) => void;
-
export const subscribe: <A>(f: (x: A) => void) => (source: Source<A>) => Subscription;
+
/* sinks */
+
export * from './sinks/wonka_sink_publish';
+
export * from './sinks/wonka_sink_subscribe';
+37
src/wonka.ml
···
+
module Types = Wonka_types
+
+
(* sources *)
+
include Wonka_source_fromArray
+
include Wonka_source_fromList
+
include Wonka_source_fromValue
+
include Wonka_source_make
+
include Wonka_source_makeSubject
+
include Wonka_source_primitives
+
+
(* operators *)
+
include Wonka_operator_combine
+
include Wonka_operator_concatMap
+
include Wonka_operator_filter
+
include Wonka_operator_map
+
include Wonka_operator_mergeMap
+
include Wonka_operator_scan
+
include Wonka_operator_share
+
include Wonka_operator_skip
+
include Wonka_operator_skipUntil
+
include Wonka_operator_skipWhile
+
include Wonka_operator_switchMap
+
include Wonka_operator_take
+
include Wonka_operator_takeLast
+
include Wonka_operator_takeUntil
+
include Wonka_operator_takeWhile
+
include Wonka_operator_tap
+
+
(* sinks *)
+
include Wonka_sink_publish
+
include Wonka_sink_subscribe
+
+
#if BS_NATIVE then
+
#if BSB_BACKEND = "js" then
+
include WonkaJs
+
#end
+
#end
-913
src/wonka.re
···
-
open Wonka_types;
-
open Wonka_helpers;
-
-
module Types = Wonka_types;
-
-
type subjectState('a) = {
-
mutable sinks: Rebel.Array.t(sinkT('a)),
-
mutable ended: bool
-
};
-
-
let makeSubject = () => {
-
let state: subjectState('a) = {
-
sinks: Rebel.Array.makeEmpty(),
-
ended: false,
-
};
-
-
let source = sink => {
-
state.sinks = Rebel.Array.append(state.sinks, sink);
-
sink(.Start((.signal) => {
-
if (signal === Close) {
-
state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
-
}
-
}));
-
};
-
-
let next = value =>
-
if (!state.ended) {
-
Rebel.Array.forEach(state.sinks, sink => sink(.Push(value)));
-
};
-
-
let complete = () =>
-
if (!state.ended) {
-
state.ended = true;
-
Rebel.Array.forEach(state.sinks, sink => sink(.End));
-
};
-
-
{ source, next, complete }
-
};
-
-
let make = f => curry(sink => {
-
let teardown = f(.{
-
next: value => sink(.Push(value)),
-
complete: () => sink(.End)
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => teardown(.)
-
| Pull => ()
-
}
-
}));
-
});
-
-
type fromListState('a) = {
-
mutable value: 'a,
-
mutable ended: bool,
-
mutable looping: bool,
-
mutable pull: bool
-
};
-
-
let fromList = ls => curry(sink => {
-
let state = {
-
value: ls,
-
ended: false,
-
looping: false,
-
pull: false
-
};
-
-
sink(.Start((.signal) => {
-
switch (signal, state.looping) {
-
| (Pull, false) => {
-
state.pull = true;
-
state.looping = true;
-
-
while (state.pull && !state.ended) {
-
switch (state.value) {
-
| [x, ...rest] => {
-
state.value = rest;
-
state.pull = false;
-
sink(.Push(x));
-
}
-
| [] => {
-
state.ended = true;
-
sink(.End);
-
}
-
}
-
};
-
-
state.looping = false;
-
}
-
| (Pull, true) => state.pull = true
-
| (Close, _) => state.ended = true
-
}
-
}));
-
});
-
-
type fromArrayState('a) = {
-
mutable index: int,
-
mutable ended: bool,
-
mutable looping: bool,
-
mutable pull: bool
-
};
-
-
let fromArray = arr => curry(sink => {
-
let size = Rebel.Array.size(arr);
-
let state = {
-
index: 0,
-
ended: false,
-
looping: false,
-
pull: false
-
};
-
-
sink(.Start((.signal) => {
-
switch (signal, state.looping) {
-
| (Pull, false) => {
-
state.pull = true;
-
state.looping = true;
-
-
while (state.pull && !state.ended) {
-
let index = state.index;
-
if (index < size) {
-
let x = Rebel.Array.getUnsafe(arr, index);
-
state.index = index + 1;
-
state.pull = false;
-
sink(.Push(x));
-
} else {
-
state.ended = true;
-
sink(.End);
-
}
-
};
-
-
state.looping = false;
-
}
-
| (Pull, true) => state.pull = true
-
| (Close, _) => state.ended = true
-
}
-
}));
-
});
-
-
let fromValue = x => curry(sink => {
-
let ended = ref(false);
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Pull when !ended^ => {
-
ended := true;
-
sink(.Push(x));
-
sink(.End);
-
}
-
| _ => ()
-
}
-
}));
-
});
-
-
let empty = sink => {
-
sink(.Start(talkbackPlaceholder));
-
sink(.End);
-
};
-
-
let never = sink => {
-
sink(.Start(talkbackPlaceholder));
-
};
-
-
let tap = f => curry(source => curry(sink => {
-
source((.signal) => {
-
switch (signal) {
-
| Push(x) => f(.x)
-
| _ => ()
-
};
-
-
sink(.signal);
-
});
-
}));
-
-
let map = f => curry(source => curry(sink => {
-
source((.signal) => sink(.
-
switch (signal) {
-
| Start(x) => Start(x)
-
| Push(x) => Push(f(.x))
-
| End => End
-
}
-
));
-
}));
-
-
let filter = f => curry(source => curry(sink => {
-
captureTalkback(source, (.signal, talkback) => {
-
switch (signal) {
-
| Push(x) when !f(.x) => talkback(.Pull)
-
| _ => sink(.signal)
-
}
-
});
-
}));
-
-
let scan = (f, seed) => curry(source => curry(sink => {
-
let acc = ref(seed);
-
-
source((.signal) => sink(.
-
switch (signal) {
-
| Push(x) => {
-
acc := f(acc^, x);
-
Push(acc^)
-
}
-
| Start(x) => Start(x)
-
| End => End
-
}
-
));
-
}));
-
-
type mergeMapStateT = {
-
mutable outerTalkback: (.talkbackT) => unit,
-
mutable innerTalkbacks: Rebel.Array.t((.talkbackT) => unit),
-
mutable ended: bool
-
};
-
-
let mergeMap = f => curry(source => curry(sink => {
-
let state: mergeMapStateT = {
-
outerTalkback: talkbackPlaceholder,
-
innerTalkbacks: Rebel.Array.makeEmpty(),
-
ended: false
-
};
-
-
let applyInnerSource = innerSource => {
-
let talkback = ref(talkbackPlaceholder);
-
-
innerSource((.signal) => {
-
switch (signal) {
-
| End => {
-
state.innerTalkbacks = Rebel.Array.filter(state.innerTalkbacks, x => x !== talkback^);
-
if (state.ended && Rebel.Array.size(state.innerTalkbacks) === 0) {
-
sink(.End);
-
}
-
}
-
| Start(tb) => {
-
talkback := tb;
-
state.innerTalkbacks = Rebel.Array.append(state.innerTalkbacks, tb);
-
tb(.Pull);
-
}
-
| Push(x) when Rebel.Array.size(state.innerTalkbacks) !== 0 => {
-
sink(.Push(x));
-
talkback^(.Pull);
-
}
-
| Push(_) => ()
-
}
-
});
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| End when !state.ended => {
-
state.ended = true;
-
if (Rebel.Array.size(state.innerTalkbacks) === 0) {
-
sink(.End);
-
}
-
}
-
| End => ()
-
| Start(tb) => {
-
state.outerTalkback = tb;
-
tb(.Pull);
-
}
-
| Push(x) when !state.ended => {
-
applyInnerSource(f(.x));
-
state.outerTalkback(.Pull);
-
}
-
| Push(_) => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close when !state.ended => {
-
state.ended = true;
-
state.outerTalkback(.Close);
-
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close));
-
state.innerTalkbacks = Rebel.Array.makeEmpty();
-
}
-
| Close => ()
-
| Pull when !state.ended =>
-
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Pull));
-
| Pull => ()
-
}
-
}));
-
}));
-
-
let merge = sources => mergeMap((.x) => x, fromArray(sources));
-
let mergeAll = source => mergeMap((.x) => x, source);
-
let flatten = mergeAll;
-
-
type concatMapStateT('a) = {
-
inputQueue: Rebel.MutableQueue.t('a),
-
mutable outerTalkback: (.talkbackT) => unit,
-
mutable innerTalkback: (.talkbackT) => unit,
-
mutable innerActive: bool,
-
mutable closed: bool,
-
mutable ended: bool
-
};
-
-
let concatMap = f => curry(source => curry(sink => {
-
let state: concatMapStateT('a) = {
-
inputQueue: Rebel.MutableQueue.make(),
-
outerTalkback: talkbackPlaceholder,
-
innerTalkback: talkbackPlaceholder,
-
innerActive: false,
-
closed: false,
-
ended: false
-
};
-
-
let rec applyInnerSource = innerSource =>
-
innerSource((.signal) => {
-
switch (signal) {
-
| End => {
-
state.innerActive = false;
-
state.innerTalkback = talkbackPlaceholder;
-
-
switch (Rebel.MutableQueue.pop(state.inputQueue)) {
-
| Some(input) => applyInnerSource(f(.input))
-
| None when state.ended => sink(.End)
-
| None => ()
-
};
-
}
-
| Start(tb) => {
-
state.innerActive = true;
-
state.innerTalkback = tb;
-
tb(.Pull);
-
}
-
| Push(x) when !state.closed => {
-
sink(.Push(x));
-
state.innerTalkback(.Pull);
-
}
-
| Push(_) => ()
-
}
-
});
-
-
source((.signal) => {
-
switch (signal) {
-
| End when !state.ended => {
-
state.ended = true;
-
if (!state.innerActive && Rebel.MutableQueue.isEmpty(state.inputQueue)) {
-
sink(.End);
-
}
-
}
-
| End => ()
-
| Start(tb) => {
-
state.outerTalkback = tb;
-
tb(.Pull);
-
}
-
| Push(x) when !state.ended => {
-
if (state.innerActive) {
-
Rebel.MutableQueue.add(state.inputQueue, x);
-
} else {
-
applyInnerSource(f(.x));
-
}
-
-
state.outerTalkback(.Pull);
-
}
-
| Push(_) => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Pull => if (!state.ended) state.innerTalkback(.Pull)
-
| Close when !state.ended => {
-
state.ended = true;
-
state.closed = true;
-
state.outerTalkback(.Close);
-
state.innerTalkback(.Close);
-
}
-
| Close => ()
-
}
-
}));
-
}));
-
-
let concatAll = source => concatMap((.x) => x, source);
-
let concat = sources => concatMap((.x) => x, fromArray(sources));
-
-
type switchMapStateT('a) = {
-
mutable outerTalkback: (.talkbackT) => unit,
-
mutable innerTalkback: (.talkbackT) => unit,
-
mutable innerActive: bool,
-
mutable closed: bool,
-
mutable ended: bool
-
};
-
-
let switchMap = f => curry(source => curry(sink => {
-
let state: switchMapStateT('a) = {
-
outerTalkback: talkbackPlaceholder,
-
innerTalkback: talkbackPlaceholder,
-
innerActive: false,
-
closed: false,
-
ended: false
-
};
-
-
let applyInnerSource = innerSource =>
-
innerSource((.signal) => {
-
switch (signal) {
-
| End => {
-
state.innerActive = false;
-
state.innerTalkback = talkbackPlaceholder;
-
if (state.ended) sink(.End);
-
}
-
| Start(tb) => {
-
state.innerActive = true;
-
state.innerTalkback = tb;
-
tb(.Pull);
-
}
-
| Push(x) when !state.closed => {
-
sink(.Push(x));
-
state.innerTalkback(.Pull);
-
}
-
| Push(_) => ()
-
}
-
});
-
-
source((.signal) => {
-
switch (signal) {
-
| End when !state.ended => {
-
state.ended = true;
-
if (!state.innerActive) sink(.End);
-
}
-
| End => ()
-
| Start(tb) => {
-
state.outerTalkback = tb;
-
tb(.Pull);
-
}
-
| Push(x) when !state.ended => {
-
if (state.innerActive) {
-
state.innerTalkback(.Close);
-
state.innerTalkback = talkbackPlaceholder;
-
}
-
applyInnerSource(f(.x));
-
state.outerTalkback(.Pull);
-
}
-
| Push(_) => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Pull => state.innerTalkback(.Pull)
-
| Close when !state.ended => {
-
state.ended = true;
-
state.closed = true;
-
state.outerTalkback(.Close);
-
state.innerTalkback(.Close);
-
state.innerTalkback = talkbackPlaceholder;
-
}
-
| Close => ()
-
}
-
}));
-
}));
-
-
type shareStateT('a) = {
-
mutable sinks: Rebel.Array.t(sinkT('a)),
-
mutable talkback: (.talkbackT) => unit,
-
mutable gotSignal: bool
-
};
-
-
let share = source => {
-
let state = {
-
sinks: Rebel.Array.makeEmpty(),
-
talkback: talkbackPlaceholder,
-
gotSignal: false
-
};
-
-
sink => {
-
state.sinks = Rebel.Array.append(state.sinks, sink);
-
-
if (Rebel.Array.size(state.sinks) === 1) {
-
source((.signal) => {
-
switch (signal) {
-
| Push(_) => {
-
state.gotSignal = false;
-
Rebel.Array.forEach(state.sinks, sink => sink(.signal));
-
}
-
| Start(x) => state.talkback = x
-
| End => {
-
Rebel.Array.forEach(state.sinks, sink => sink(.End));
-
state.sinks = Rebel.Array.makeEmpty();
-
}
-
}
-
});
-
};
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => {
-
state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
-
if (Rebel.Array.size(state.sinks) === 0) {
-
state.talkback(.Close);
-
};
-
}
-
| Pull when !state.gotSignal => {
-
state.gotSignal = true;
-
state.talkback(.signal);
-
}
-
| Pull => ()
-
}
-
}));
-
}
-
};
-
-
type combineStateT('a, 'b) = {
-
mutable talkbackA: (.talkbackT) => unit,
-
mutable talkbackB: (.talkbackT) => unit,
-
mutable lastValA: option('a),
-
mutable lastValB: option('b),
-
mutable gotSignal: bool,
-
mutable endCounter: int,
-
mutable ended: bool,
-
};
-
-
let combine = (sourceA, sourceB) => curry(sink => {
-
let state = {
-
talkbackA: talkbackPlaceholder,
-
talkbackB: talkbackPlaceholder,
-
lastValA: None,
-
lastValB: None,
-
gotSignal: false,
-
endCounter: 0,
-
ended: false
-
};
-
-
sourceA((.signal) => {
-
switch (signal, state.lastValB) {
-
| (Start(tb), _) => state.talkbackA = tb
-
| (Push(a), None) => {
-
state.lastValA = Some(a);
-
state.gotSignal = false;
-
}
-
| (Push(a), Some(b)) when !state.ended => {
-
state.lastValA = Some(a);
-
state.gotSignal = false;
-
sink(.Push((a, b)));
-
}
-
| (End, _) when state.endCounter < 1 =>
-
state.endCounter = state.endCounter + 1
-
| (End, _) when !state.ended => {
-
state.ended = true;
-
sink(.End);
-
}
-
| _ => ()
-
}
-
});
-
-
sourceB((.signal) => {
-
switch (signal, state.lastValA) {
-
| (Start(tb), _) => state.talkbackB = tb
-
| (Push(b), None) => {
-
state.lastValB = Some(b);
-
state.gotSignal = false;
-
}
-
| (Push(b), Some(a)) when !state.ended => {
-
state.lastValB = Some(b);
-
state.gotSignal = false;
-
sink(.Push((a, b)));
-
}
-
| (End, _) when state.endCounter < 1 =>
-
state.endCounter = state.endCounter + 1
-
| (End, _) when !state.ended => {
-
state.ended = true;
-
sink(.End);
-
}
-
| _ => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
if (!state.ended) {
-
switch (signal) {
-
| Close => {
-
state.ended = true;
-
state.talkbackA(.Close);
-
state.talkbackB(.Close);
-
}
-
| Pull when !state.gotSignal => {
-
state.gotSignal = true;
-
state.talkbackA(.signal);
-
state.talkbackB(.signal);
-
}
-
| Pull => ()
-
}
-
};
-
}));
-
});
-
-
type takeStateT = {
-
mutable taken: int,
-
mutable talkback: (.talkbackT) => unit
-
};
-
-
let take = max => curry(source => curry(sink => {
-
let state: takeStateT = {
-
taken: 0,
-
talkback: talkbackPlaceholder
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => state.talkback = tb;
-
| Push(_) when state.taken < max => {
-
state.taken = state.taken + 1;
-
sink(.signal);
-
-
if (state.taken === max) {
-
sink(.End);
-
state.talkback(.Close);
-
};
-
}
-
| Push(_) => ()
-
| End when state.taken < max => {
-
state.taken = max;
-
sink(.End)
-
}
-
| End => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
if (state.taken < max) {
-
switch (signal) {
-
| Pull => state.talkback(.Pull);
-
| Close => {
-
state.taken = max;
-
state.talkback(.Close);
-
}
-
}
-
};
-
}));
-
}));
-
-
let takeLast = max => curry(source => curry(sink => {
-
open Rebel;
-
let queue = MutableQueue.make();
-
-
captureTalkback(source, (.signal, talkback) => {
-
switch (signal) {
-
| Start(_) => talkback(.Pull)
-
| Push(x) => {
-
let size = MutableQueue.size(queue);
-
if (size >= max && max > 0) {
-
ignore(MutableQueue.pop(queue));
-
};
-
-
MutableQueue.add(queue, x);
-
talkback(.Pull);
-
}
-
| End => makeTrampoline(sink, (.) => MutableQueue.pop(queue))
-
}
-
});
-
}));
-
-
let takeWhile = f => curry(source => curry(sink => {
-
let ended = ref(false);
-
let talkback = ref(talkbackPlaceholder);
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => {
-
talkback := tb;
-
sink(.signal);
-
}
-
| End when !ended^ => {
-
ended := true;
-
sink(.End);
-
}
-
| End => ()
-
| Push(x) when !ended^ => {
-
if (!f(.x)) {
-
ended := true;
-
sink(.End);
-
talkback^(.Close);
-
} else {
-
sink(.signal);
-
};
-
}
-
| Push(_) => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
if (!ended^) {
-
switch (signal) {
-
| Pull => talkback^(.Pull);
-
| Close => {
-
ended := true;
-
talkback^(.Close);
-
}
-
}
-
};
-
}));
-
}));
-
-
type takeUntilStateT = {
-
mutable ended: bool,
-
mutable sourceTalkback: (.talkbackT) => unit,
-
mutable notifierTalkback: (.talkbackT) => unit
-
};
-
-
let takeUntil = notifier => curry(source => curry(sink => {
-
let state: takeUntilStateT = {
-
ended: false,
-
sourceTalkback: talkbackPlaceholder,
-
notifierTalkback: talkbackPlaceholder
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => {
-
state.sourceTalkback = tb;
-
-
notifier((.signal) => {
-
switch (signal) {
-
| Start(innerTb) => {
-
state.notifierTalkback = innerTb;
-
innerTb(.Pull);
-
}
-
| Push(_) => {
-
state.ended = true;
-
state.notifierTalkback(.Close);
-
state.sourceTalkback(.Close);
-
sink(.End);
-
}
-
| End => ()
-
}
-
});
-
}
-
| End when !state.ended => {
-
state.notifierTalkback(.Close);
-
state.ended = true;
-
sink(.End);
-
}
-
| End => ()
-
| Push(_) when !state.ended => sink(.signal)
-
| Push(_) => ()
-
}
-
});
-
-
sink(.Start((.signal) => {
-
if (!state.ended) {
-
switch (signal) {
-
| Close => {
-
state.sourceTalkback(.Close);
-
state.notifierTalkback(.Close);
-
}
-
| Pull => state.sourceTalkback(.Pull)
-
}
-
};
-
}));
-
}));
-
-
let skip = wait => curry(source => curry(sink => {
-
let rest = ref(wait);
-
-
captureTalkback(source, (.signal, talkback) => {
-
switch (signal) {
-
| Push(_) when rest^ > 0 => {
-
rest := rest^ - 1;
-
talkback(.Pull);
-
}
-
| _ => sink(.signal)
-
}
-
});
-
}));
-
-
let skipWhile = f => curry(source => curry(sink => {
-
let skip = ref(true);
-
-
captureTalkback(source, (.signal, talkback) => {
-
switch (signal) {
-
| Push(x) when skip^ => {
-
if (f(.x)) {
-
talkback(.Pull);
-
} else {
-
skip := false;
-
sink(.signal);
-
};
-
}
-
| _ => sink(.signal)
-
}
-
});
-
}));
-
-
type skipUntilStateT = {
-
mutable skip: bool,
-
mutable ended: bool,
-
mutable gotSignal: bool,
-
mutable sourceTalkback: (.talkbackT) => unit,
-
mutable notifierTalkback: (.talkbackT) => unit
-
};
-
-
let skipUntil = notifier => curry(source => curry(sink => {
-
let state: skipUntilStateT = {
-
skip: true,
-
ended: false,
-
gotSignal: false,
-
sourceTalkback: talkbackPlaceholder,
-
notifierTalkback: talkbackPlaceholder
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(tb) => {
-
state.sourceTalkback = tb;
-
-
notifier((.signal) => {
-
switch (signal) {
-
| Start(innerTb) => {
-
state.notifierTalkback = innerTb;
-
innerTb(.Pull);
-
tb(.Pull);
-
}
-
| Push(_) => {
-
state.skip = false;
-
state.notifierTalkback(.Close);
-
}
-
| End => ()
-
}
-
});
-
}
-
| Push(_) when state.skip && !state.ended => state.sourceTalkback(.Pull)
-
| Push(_) when !state.ended => {
-
state.gotSignal = false;
-
sink(.signal)
-
}
-
| Push(_) => ()
-
| End => {
-
if (state.skip) state.notifierTalkback(.Close);
-
state.ended = true;
-
sink(.End)
-
}
-
}
-
});
-
-
sink(.Start((.signal) => {
-
switch (signal) {
-
| Close => {
-
if (state.skip) state.notifierTalkback(.Close);
-
state.ended = true;
-
state.sourceTalkback(.Close);
-
}
-
| Pull when !state.gotSignal && !state.ended => {
-
state.gotSignal = true;
-
state.sourceTalkback(.Pull);
-
}
-
| Pull => ()
-
}
-
}));
-
}));
-
-
type publishStateT = {
-
mutable talkback: (.talkbackT) => unit,
-
mutable ended: bool
-
};
-
-
let publish = source => {
-
let state: publishStateT = {
-
talkback: talkbackPlaceholder,
-
ended: false
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(x) => {
-
state.talkback = x;
-
x(.Pull);
-
}
-
| Push(_) => if (!state.ended) state.talkback(.Pull);
-
| End => state.ended = true;
-
}
-
});
-
-
{
-
unsubscribe: () =>
-
if (!state.ended) {
-
state.ended = true;
-
state.talkback(.Close);
-
}
-
}
-
};
-
-
let subscribe = f => curry(source => {
-
let state: publishStateT = {
-
talkback: talkbackPlaceholder,
-
ended: false
-
};
-
-
source((.signal) => {
-
switch (signal) {
-
| Start(x) => {
-
state.talkback = x;
-
x(.Pull);
-
}
-
| Push(x) when !state.ended => {
-
f(.x);
-
state.talkback(.Pull);
-
}
-
| Push(_) => ()
-
| End => state.ended = true;
-
}
-
});
-
-
{
-
unsubscribe: () =>
-
if (!state.ended) {
-
state.ended = true;
-
state.talkback(.Close);
-
}
-
}
-
});
-
-
let forEach = f => curry(source => {
-
ignore(subscribe(f, source));
-
});
-175
src/wonka.rei
···
-
open Wonka_types;
-
-
module Types = Wonka_types;
-
-
/* -- subject factory */
-
-
let makeSubject: unit => subjectT('a);
-
-
/* -- source factories */
-
-
/* When constructed, calls a function that receives an observer
-
and creates a push-based stream of events. This is useful
-
for constructing any kind of asynchronous stream. The return
-
callback from the passed observer function will be called when
-
the stream is closed or ends */
-
let make: ((.observerT('a)) => teardownT, sinkT('a)) => unit;
-
-
/* Accepts a list and creates a pullable source for that list.
-
The source will emit events when being pulled until the list
-
is exhausted and it completes */
-
let fromList: (list('a), sinkT('a)) => unit;
-
-
/* Accepts an array and creates a pullable source for that array.
-
The source will emit events when being pulled until the array
-
is exhausted and it completes */
-
let fromArray: (array('a), sinkT('a)) => unit;
-
-
/* Accepts a value and creates a pullable source emitting just
-
that single value. */
-
let fromValue: ('a, sinkT('a)) => unit;
-
-
/* A source that ends immediately */
-
let empty: (sinkT('a)) => unit;
-
-
/* A source that never ends or emits a value */
-
let never: (sinkT('a)) => unit;
-
-
/* -- operators */
-
-
/* Takes a callback and a source, and creates a sink & source.
-
The callback will be called for each value that it receives */
-
let tap: ((.'a) => unit, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a mapping function from one type to another, and a source,
-
and creates a sink & source.
-
All values that it receives will be transformed using the mapping
-
function and emitted on the new source */
-
let map: ((.'a) => 'b, sourceT('a), sinkT('b)) => unit;
-
-
/* Takes a predicate function returning a boolean, and a source,
-
and creates a sink & source.
-
All values that it receives will be filtered using the predicate,
-
and only truthy values will be passed on to the new source.
-
The sink will attempt to pull a new value when one was filtered. */
-
let filter: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a reducer function, a seed value, and a source, and creates
-
a sink & source.
-
The last returned value from the reducer function (or the seed value
-
initially) will be passed to the reducer together with the value
-
that the sink receives. All return values of the reducer function
-
are emitted on the new source. */
-
let scan: (('b, 'a) => 'b, 'b, sourceT('a), sinkT('b)) => unit;
-
-
/* Takes a mapping function from one types to a source output,
-
and a source, and creates a sink & source.
-
The mapping function is called with each value it receives and
-
the resulting inner source is merged into the output source. */
-
let mergeMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
-
-
/* Takes a mapping function from one types to a source output,
-
and a source, and creates a sink & source.
-
The mapping function is called with each value it receives and
-
the latest inner source is merged into the output source. When
-
a new value comes in the previous source is dicarded. */
-
let switchMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
-
-
/* Takes a mapping function from one types to a source output,
-
and a source, and creates a sink & source.
-
The mapping function is called with each value it receives and
-
the resulting inner sources are subscribed to and piped through
-
to the output source in order. */
-
let concatMap: ((.'a) => sourceT('b), sourceT('a), sinkT('b)) => unit;
-
-
/* Takes an array of sources and creates a sink & source.
-
All values that the sink receives from all sources will be passed through
-
to the new source. */
-
let merge: (array(sourceT('a)), sinkT('a)) => unit;
-
-
/* Takes an array of sources and creates a sink & source.
-
The values from each sources will be emitted on the sink, one after another.
-
When one source ends, the next one will start. */
-
let concat: (array(sourceT('a)), sinkT('a)) => unit;
-
-
/* Works the same as mergeMap but as an operator on a source
-
of nested sources. */
-
let mergeAll: (sourceT(sourceT('a)), sinkT('a)) => unit;
-
let flatten: (sourceT(sourceT('a)), sinkT('a)) => unit;
-
-
/* Works the same as concatMap but as an operator on a source
-
of nested sources. */
-
let concatAll: (sourceT(sourceT('a)), sinkT('a)) => unit;
-
-
/* Takes a listenable or a pullable source and creates a new source that
-
will ensure that the source is shared between all sinks that follow.
-
Essentially the original source will only be created once. */
-
let share: (sourceT('a), sinkT('a)) => unit;
-
-
/* Takes two sources and creates a new source & sink.
-
All latest values from both sources will be passed through as a
-
tuple of the last values that have been observed */
-
let combine: (sourceT('a), sourceT('b), sinkT(('a, 'b))) => unit;
-
-
/* Takes a max number and a source, and creates a sink & source.
-
It will emit values that the sink receives until the passed maximum number
-
of values is reached, at which point it will end the source and the
-
returned, new source. */
-
let take: (int, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a max number and a source, and creates a sink & source.
-
It will pull values and add them to a queue limiting its size to the passed
-
number until the source ends. It will then proceed to emit
-
the cached values on the new source as a pullable. */
-
let takeLast: (int, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a predicate and a source, and creates a sink & source.
-
It will emit values that the sink receives until the predicate returns false
-
for a value, at which point it will end the source and the returned, new
-
source. */
-
let takeWhile: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a notifier source and an input source, and creates a sink & source.
-
It will not emit values that the sink receives until the notifier source
-
It will emit values that the sink receives until the notifier source
-
emits a value, at which point it will end the source and the returned, new
-
source. */
-
let takeUntil: (
-
sourceT('a),
-
sourceT('b),
-
sinkT('b)
-
) => unit;
-
-
/* Takes a number and a source, and creates a sink & source.
-
It will not emit values that the sink receives until the passed number
-
of values is reached, at which point it will start acting like a noop
-
operator, passing through every signal. */
-
let skip: (int, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a predicate and a source, and creates a sink & source.
-
It will not emit values that the sink receives until the passed predicate
-
returns false for a value, at which point it will start acting like a noop
-
operator, passing through every signal. */
-
let skipWhile: ((.'a) => bool, sourceT('a), sinkT('a)) => unit;
-
-
/* Takes a notifier source and an input source, and creates a sink & source.
-
It will not emit values that the sink receives until the notifier source
-
emits a value, at which point it will start acting like a noop
-
operator, passing through every signal. */
-
let skipUntil: (sourceT('a), sourceT('b), sinkT('b)) => unit;
-
-
/* -- sink factories */
-
-
/* Accepts a source and returns a subscription, but does otherwise not surface values */
-
let publish: sourceT('a) => subscriptionT;
-
-
/* Takes a function and a source, and creates a sink.
-
The function will be called for each value that the sink receives.
-
The sink will attempt to pull new values as values come in, until
-
the source ends. */
-
let forEach: ((.'a) => unit, sourceT('a)) => unit;
-
-
/* Similar to the `forEach` sink factory, but returns an anonymous function
-
that when called will end the stream immediately.
-
Ending the stream will propagate an End signal upwards to the root source. */
-
let subscribe: ((.'a) => unit, sourceT('a)) => subscriptionT;