Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Return unit (undefined) where possible

Changed files
+54 -39
src
+50 -36
src/Wonka_operators.re
···
ended: false,
};
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(tb) =>
state.sourceTalkback = tb;
-
notifier((. signal) =>
+
notifier((. signal) => {
switch (signal) {
| Start(tb) => state.notifierTalkback = tb
| Push(_) when !state.ended =>
···
};
sink(. End);
| End => ()
-
}
-
);
+
};
+
();
+
});
| Push(value) when !state.ended =>
Rebel.MutableQueue.add(state.buffer, value);
if (!state.pulled) {
···
};
sink(. End);
| End => ()
-
}
-
);
+
};
+
();
+
});
sink(.
Start(
···
let rec applyInnerSource = innerSource => {
state.innerActive = true;
-
innerSource((. signal) =>
+
innerSource((. signal) => {
switch (signal) {
| Start(tb) =>
state.innerTalkback = tb;
···
| None => ()
};
| End => ()
-
}
-
);
+
};
+
();
+
});
+
();
};
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(tb) => state.outerTalkback = tb
| Push(x) when !state.ended =>
···
sink(. End);
};
| End => ()
-
}
-
);
+
};
+
();
+
});
sink(.
Start(
···
curry(sink => {
let talkback = ref(talkbackPlaceholder);
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(tb) =>
talkback := tb;
sink(. signal);
| Push(x) when !f(. x) => talkback^(. Pull)
| _ => sink(. signal)
-
}
-
);
+
};
+
();
+
});
})
);
···
let map = (f: (. 'a) => 'b): operatorT('a, 'b) =>
curry(source =>
curry(sink =>
-
source((. signal) =>
+
source((. signal) => {
sink(.
/* The signal needs to be recreated for genType to generate
the correct generics during codegen */
···
| End => End
},
)
-
)
+
})
)
);
···
ended := true;
sink(. signal);
| End => ()
-
}
+
};
+
();
});
})
);
···
ended: false,
};
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(tb) =>
state.sourceTalkback = tb;
-
notifier((. signal) =>
+
notifier((. signal) => {
switch (signal) {
| Start(innerTb) =>
state.notifierTalkback = innerTb;
···
state.ended = true;
state.sourceTalkback(. Close);
| End => ()
-
}
-
);
+
};
+
();
+
});
| Push(_) when !state.skip && !state.ended =>
state.pulled = false;
sink(. signal);
···
};
state.ended = true;
sink(. End);
-
}
-
);
+
};
+
();
+
});
sink(.
Start(
···
};
}
);
+
();
};
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(tb) => state.outerTalkback = tb
| Push(x) when !state.ended =>
···
sink(. End);
};
| End => ()
-
}
-
);
+
};
+
();
+
});
sink(.
Start(
···
talkback: talkbackPlaceholder,
};
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(talkback) when max <= 0 =>
talkback(. Close);
···
Rebel.MutableQueue.toArray(state.queue),
sink,
-
}
-
);
+
};
+
();
+
});
})
);
···
notifierTalkback: talkbackPlaceholder,
};
-
source((. signal) =>
+
source((. signal) => {
switch (signal) {
| Start(tb) =>
state.sourceTalkback = tb;
-
notifier((. signal) =>
+
notifier((. signal) => {
switch (signal) {
| Start(innerTb) =>
state.notifierTalkback = innerTb;
···
state.sourceTalkback(. Close);
sink(. End);
| End => ()
-
}
-
);
+
};
+
();
+
});
| End when !state.ended =>
state.ended = true;
state.notifierTalkback(. Close);
···
| End => ()
| Push(_) when !state.ended => sink(. signal)
| Push(_) => ()
-
}
-
);
+
};
+
();
+
});
sink(.
Start(
+4 -3
src/web/WonkaJs.re
···
/* sinks */
[@genType]
let toPromise = (source: sourceT('a)): Js.Promise.t('a) => {
-
Js.Promise.make((~resolve, ~reject as _) =>
+
Js.Promise.make((~resolve, ~reject as _) => {
Wonka_operators.takeLast(1, source, (. signal) =>
switch (signal) {
| Start(x) => x(. Pull)
| Push(x) => resolve(. x)
| End => ()
}
-
)
-
);
+
);
+
();
+
});
};
/* sources */