Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Fix spec-compliance of toObservable operator (#94)

Changed files
+35 -18
src
+35 -18
src/web/Wonka_observable.re
···
[@genType.import "../shims/Js.shim"]
type observableSubscriptionT = {. [@bs.meth] "unsubscribe": unit => unit};
+
[@bs.set_index]
+
external subscription_set: (observableSubscriptionT, string, bool) => unit;
+
[@genType.import "../shims/Js.shim"]
type observableObserverT('a) = {
.
···
[@bs.get_index]
external observable_get:
-
(observableT('a), string) => option(observableFactoryT('a)) =
-
"";
+
(observableT('a), string) => option(observableFactoryT('a));
[@bs.get_index]
external observable_unsafe_get:
-
(observableT('a), string) => observableFactoryT('a) =
-
"";
+
(observableT('a), string) => observableFactoryT('a);
[@bs.set_index]
external observable_set:
-
(observableT('a), string, unit => observableT('a)) => unit =
-
"";
+
(observableT('a), string, unit => observableT('a)) => unit;
[@genType]
let fromObservable = (input: observableT('a)): sourceT('a) => {
···
{
as _;
pub subscribe =
-
(observer: observableObserverT('a)): observableSubscriptionT => {
+
(_observer: observableObserverT('a)): observableSubscriptionT => {
+
let next: (. 'a) => unit = [%raw
+
{|
+
(typeof _observer === 'object' ? _observer.next : _observer) || function () {}
+
|}
+
];
+
+
let complete: (. unit) => unit = [%raw
+
{|
+
(typeof _observer === 'object' ? _observer.complete : arguments[2]) || function () {}
+
|}
+
];
+
let state: observableStateT = {
talkback: talkbackPlaceholder,
ended: false,
···
state.talkback = x;
x(. Pull);
| Push(x) when !state.ended =>
-
observer##next(x);
+
next(. x);
state.talkback(. Pull);
| Push(_) => ()
| End =>
state.ended = true;
-
observer##complete();
+
complete(.);
}
);
-
[@bs]
-
{
-
as _;
-
pub unsubscribe = () =>
-
if (!state.ended) {
-
state.ended = true;
-
state.talkback(. Close);
-
}
-
};
+
let subscription =
+
[@bs]
+
{
+
as self;
+
pub unsubscribe = () =>
+
if (!state.ended) {
+
self->subscription_set("closed", false);
+
state.ended = true;
+
state.talkback(. Close);
+
}
+
};
+
+
subscription->subscription_set("closed", false);
+
subscription;
}
};