Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add {from,to}Observable and {from,to}Callbag (#31)

* Implement fromObservable operator

* Add toObservable and refactor

* Rename symbol to observableSymbol

* Support Observables without Symbol.observable

* Add fromCallbag and toCallbag

* Support synchronous observables

* Add tests for fromObservable and fromCallbag

* Fix Observable.symbol creation and toObservable

* Fix method usage on Symbol.observable

`this` was not respected since the method was separated
from the input observable.

* Add tests for toObservable and toCallbag

* Revert changes to fromObservable

+83
__tests__/wonka_test.re
···
expect(ended^) === false;
});
});
+
+
describe("fromObservable", () =>
+
Expect.(
+
testPromise("creates a source from an observable", () => {
+
let observable = Wonka_thelpers.observableFromArray([|1, 2|]);
+
+
Wonka_thelpers.testSource(Wonka.fromObservable(observable))
+
|> Js.Promise.then_(x =>
+
expect(x)
+
|> toEqual([|Push(1), Push(2), End|])
+
|> Js.Promise.resolve
+
);
+
})
+
)
+
);
+
+
describe("fromCallbag", () => {
+
open Expect;
+
+
testPromise("creates a source from a callbag (observable)", () => {
+
let observable = Wonka_thelpers.observableFromArray([|1, 2|]);
+
let callbag = Wonka_thelpers.callbagFromObservable(observable);
+
+
Wonka_thelpers.testSource(Wonka.fromCallbag(callbag))
+
|> Js.Promise.then_(x =>
+
expect(x)
+
|> toEqual([|Push(1), Push(2), End|])
+
|> Js.Promise.resolve
+
);
+
});
+
+
testPromise("creates a source from a callbag (iterable)", () => {
+
let callbag = Wonka_thelpers.callbagFromArray([|1, 2|]);
+
+
Wonka_thelpers.testSource(Wonka.fromCallbag(callbag))
+
|> Js.Promise.then_(x =>
+
expect(x)
+
|> toEqual([|Push(1), Push(2), End|])
+
|> Js.Promise.resolve
+
);
+
});
+
});
});
describe("operator factories", () => {
···
expect(nums) |> toEqual([|0, 1|]);
},
+
)
+
);
+
+
describe("toObservable", () =>
+
Expect.(
+
testPromise("should convert a source to an Observable", () => {
+
let source = Wonka.fromArray([|1, 2|]);
+
let observable =
+
Wonka.toObservable(source) |> Wonka_thelpers.observableFrom;
+
let values = [||];
+
+
let promise =
+
observable->Wonka_thelpers.observableForEach(value =>
+
ignore(Js.Array.push(value, values))
+
);
+
+
promise
+
|> Js.Promise.then_(() =>
+
expect(values) |> toEqual([|1, 2|]) |> Js.Promise.resolve
+
);
+
})
+
)
+
);
+
+
describe("toCallbag", () =>
+
Expect.(
+
it("should convert a source to a Callbag", () => {
+
let source = Wonka.fromArray([|1, 2|]);
+
let callbag = Wonka.toCallbag(source);
+
let values = [||];
+
+
(
+
Wonka_thelpers.callbagIterate(. value =>
+
ignore(Js.Array.push(value, values))
+
)
+
)(.
+
callbag,
+
);
+
+
expect(values) |> toEqual([|1, 2|]);
+
})
);
});
+54
__tests__/wonka_thelpers.re
···
);
});
};
+
+
let testSource = source => {
+
let talkback = ref((. _: talkbackT) => ());
+
let res = [||];
+
+
Js.Promise.make((~resolve, ~reject as _) =>
+
source((. signal) =>
+
switch (signal) {
+
| Start(x) =>
+
talkback := x;
+
talkback^(. Pull);
+
| Push(_) =>
+
ignore(Js.Array.push(signal, res));
+
talkback^(. Pull);
+
| End =>
+
ignore(Js.Array.push(signal, res));
+
resolve(. res);
+
}
+
)
+
);
+
};
+
+
type observableClassT;
+
+
[@bs.module] external observableClass: observableClassT = "zen-observable";
+
[@bs.send]
+
external _observableFromArray:
+
(observableClassT, array('a)) => Wonka.observableT('a) =
+
"from";
+
[@bs.send]
+
external _observableFrom:
+
(observableClassT, Wonka.observableT('a)) => Wonka.observableT('a) =
+
"from";
+
[@bs.send]
+
external observableForEach:
+
(Wonka.observableT('a), 'a => unit) => Js.Promise.t(unit) =
+
"forEach";
+
+
let observableFromArray = (arr: array('a)): Wonka.observableT('a) =>
+
_observableFromArray(observableClass, arr);
+
let observableFrom = (obs: Wonka.observableT('a)): Wonka.observableT('a) =>
+
_observableFrom(observableClass, obs);
+
+
[@bs.module]
+
external callbagFromArray: array('a) => Wonka.callbagT('a) =
+
"callbag-from-iter";
+
+
[@bs.module]
+
external callbagFromObservable: Wonka.observableT('a) => Wonka.callbagT('a) =
+
"callbag-from-obs";
+
+
[@bs.module]
+
external callbagIterate: (. ('a => unit)) => (. Wonka.callbagT('a)) => unit =
+
"callbag-iterate";
+5 -1
package.json
···
"devDependencies": {
"@glennsl/bs-jest": "^0.4.8",
"bs-platform": "^5.0.4",
+
"callbag-from-iter": "^1.2.0",
+
"callbag-from-obs": "^1.2.0",
+
"callbag-iterate": "^1.0.0",
"codecov": "^3.5.0",
"flowgen": "^1.8.0",
"gatsby": "^2.11.0",
···
"rollup-plugin-commonjs": "^9.3.4",
"rollup-plugin-node-resolve": "^4.2.3",
"rollup-plugin-prettier": "^0.6.0",
-
"rollup-plugin-terser": "^4.0.4"
+
"rollup-plugin-terser": "^4.0.4",
+
"zen-observable": "^0.8.14"
},
"lint-staged": {
"*.{d.ts,js}": [
+4
src/web/wonkaJs.d.ts
···
export * from './wonka_source_fromDomEvent';
export * from './wonka_source_fromListener';
export * from './wonka_source_fromPromise';
+
+
/* wrappers */
+
export * from './wonka_observable';
+
export * from './wonka_callbag';
+4
src/web/wonkaJs.re
···
include Wonka_source_fromDomEvent;
include Wonka_source_fromListener;
include Wonka_source_fromPromise;
+
+
/* wrappers */
+
include Wonka_observable;
+
include Wonka_callbag;
+10
src/web/wonka_callbag.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export type Callbag<I, O> = {
+
(t: 0, d: Callbag<O, I>): void;
+
(t: 1, d: I): void;
+
(t: 2, d?: any): void;
+
};
+
+
export const fromCallbag: <T>(callbag: Callbag<void, T>) => Source<T>;
+
export const toCallbag: <T>(source: Source<T>) => Callbag<void, T>;
+57
src/web/wonka_callbag.re
···
+
open Wonka_types;
+
+
type callbagSignal =
+
| CALLBAG_START /* 0 */
+
| CALLBAG_DATA /* 1 */
+
| CALLBAG_END /* 2 */;
+
+
type callbagData('a);
+
type callbagTalkback = (. callbagSignal) => unit;
+
type callbagT('a) = (. callbagSignal, callbagData('a)) => unit;
+
+
external unsafe_getCallbag: callbagData('a) => callbagT('a) = "%identity";
+
external unsafe_getTalkback: callbagData('a) => callbagTalkback = "%identity";
+
external unsafe_getValue: callbagData('a) => 'a = "%identity";
+
external unsafe_wrap: 'any => callbagData('a) = "%identity";
+
+
let fromCallbag = callbag =>
+
curry(sink => {
+
let wrappedSink =
+
(. signal, data) =>
+
switch (signal) {
+
| CALLBAG_START =>
+
let talkback = unsafe_getTalkback(data);
+
let wrappedTalkback = (
+
(. talkbackSignal: talkbackT) =>
+
switch (talkbackSignal) {
+
| Pull => talkback(. CALLBAG_DATA)
+
| Close => talkback(. CALLBAG_END)
+
}
+
);
+
sink(. Start(wrappedTalkback));
+
| CALLBAG_DATA => sink(. Push(unsafe_getValue(data)))
+
| CALLBAG_END => sink(. End)
+
};
+
callbag(. CALLBAG_START, unsafe_wrap(wrappedSink));
+
});
+
+
let toCallbag = source =>
+
curry((. signal, data) =>
+
if (signal === CALLBAG_START) {
+
let callbag = unsafe_getCallbag(data);
+
source((. signal) =>
+
switch (signal) {
+
| Start(talkbackFn) =>
+
let wrappedTalkbackFn = (talkback: callbagSignal) =>
+
switch (talkback) {
+
| CALLBAG_START => ()
+
| CALLBAG_DATA => talkbackFn(. Pull)
+
| CALLBAG_END => talkbackFn(. Close)
+
};
+
callbag(. CALLBAG_START, unsafe_wrap(wrappedTalkbackFn));
+
| Push(data) => callbag(. CALLBAG_DATA, unsafe_wrap(data))
+
| End => callbag(. CALLBAG_END, unsafe_wrap())
+
}
+
);
+
}
+
);
+6
src/web/wonka_callbag.rei
···
+
open Wonka_types;
+
+
type callbagT('a);
+
+
let fromCallbag: (callbagT('a), sinkT('a)) => unit;
+
let toCallbag: sourceT('a) => callbagT('a);
+18
src/web/wonka_observable.d.ts
···
+
import { Source } from '../wonka_types';
+
+
export interface Subscription {
+
unsubscribe(): void;
+
}
+
+
export interface Observer<T> {
+
next(value: T): void;
+
error(errorValue: any): void;
+
complete(): void;
+
}
+
+
export interface Observable<T> {
+
subscribe(observer: Observer<T>): Subscription;
+
}
+
+
export const fromObservable: <T>(observable: Observable<T>) => Source<T>;
+
export const toObservable: <T>(source: Source<T>) => Observable<T>;
+117
src/web/wonka_observable.re
···
+
open Wonka_types;
+
open Wonka_helpers;
+
+
let observableSymbol: string = [%raw
+
{|
+
typeof Symbol === 'function'
+
? Symbol.observable || (Symbol.observable = Symbol('observable'))
+
: '@@observable'
+
|}
+
];
+
+
type subscriptionT = {. [@bs.meth] "unsubscribe": unit => unit};
+
+
type observerT('a) = {
+
.
+
[@bs.meth] "next": 'a => unit,
+
[@bs.meth] "error": Js.Exn.t => unit,
+
[@bs.meth] "complete": unit => unit,
+
};
+
+
type observableT('a) = {
+
.
+
[@bs.meth] "subscribe": observerT('a) => subscriptionT,
+
};
+
+
type observableFactoryT('a) = (. unit) => observableT('a);
+
+
[@bs.get_index]
+
external observable_get:
+
(observableT('a), string) => option(observableFactoryT('a)) =
+
"";
+
[@bs.get_index]
+
external observable_unsafe_get:
+
(observableT('a), string) => observableFactoryT('a) =
+
"";
+
[@bs.set_index]
+
external observable_set:
+
(observableT('a), string, unit => observableT('a)) => unit =
+
"";
+
+
let fromObservable = (input: observableT('a)): sourceT('a) => {
+
let observable =
+
switch (input->observable_get(observableSymbol)) {
+
| Some(_) => (input->observable_unsafe_get(observableSymbol))(.)
+
| None => input
+
};
+
+
curry(sink => {
+
let observer: observerT('a) =
+
[@bs]
+
{
+
as _;
+
pub next = value => sink(. Push(value));
+
pub complete = () => sink(. End);
+
pub error = _ => ()
+
};
+
+
let subscription = observable##subscribe(observer);
+
+
sink(.
+
Start(
+
(. signal) =>
+
switch (signal) {
+
| Close => subscription##unsubscribe()
+
| _ => ()
+
},
+
),
+
);
+
});
+
};
+
+
type observableStateT = {
+
mutable talkback: (. talkbackT) => unit,
+
mutable ended: bool,
+
};
+
+
let toObservable = (source: sourceT('a)): observableT('a) => {
+
let observable: observableT('a) =
+
[@bs]
+
{
+
as _;
+
pub subscribe = (observer: observerT('a)): subscriptionT => {
+
let state: observableStateT = {
+
talkback: talkbackPlaceholder,
+
ended: false,
+
};
+
+
source((. signal) =>
+
switch (signal) {
+
| Start(x) =>
+
state.talkback = x;
+
x(. Pull);
+
| Push(x) when !state.ended =>
+
observer##next(x);
+
state.talkback(. Pull);
+
| Push(_) => ()
+
| End =>
+
state.ended = true;
+
observer##complete();
+
}
+
);
+
+
[@bs]
+
{
+
as _;
+
pub unsubscribe = () =>
+
if (!state.ended) {
+
state.ended = true;
+
state.talkback(. Close);
+
}
+
};
+
}
+
};
+
+
observable->observable_set(observableSymbol, () => observable);
+
observable;
+
};
+6
src/web/wonka_observable.rei
···
+
open Wonka_types;
+
+
type observableT('a);
+
+
let fromObservable: (observableT('a), sinkT('a)) => unit;
+
let toObservable: sourceT('a) => observableT('a);
+22
yarn.lock
···
resolved "https://registry.yarnpkg.com/call-me-maybe/-/call-me-maybe-1.0.1.tgz#26d208ea89e37b5cbde60250a15f031c16a4d66b"
integrity sha1-JtII6onje1y95gJQoV8DHBak1ms=
+
callbag-from-iter@^1.2.0:
+
version "1.2.0"
+
resolved "https://registry.yarnpkg.com/callbag-from-iter/-/callbag-from-iter-1.2.0.tgz#c1886b08a447cd2efd9a140ec11743d705f26ca9"
+
integrity sha512-9rWvHOnRGp01YMRHHwgVZOO1vu4IRR8GcoH3FpSB16AMzum5juFWJPCMX/XnkJ9j6cic/G+kvb1Grvi6IuSmIQ==
+
+
callbag-from-obs@^1.2.0:
+
version "1.2.0"
+
resolved "https://registry.yarnpkg.com/callbag-from-obs/-/callbag-from-obs-1.2.0.tgz#f092f302f302b53abaf1a4d7a5393f9a65fac517"
+
integrity sha512-InhdPC6P4Gdpg7nuXSkocDFlb+//sbwCrVCYhxOHhSVm1gDcw/zSA+IF1gHdYtk4RQKKaCymUFCkVVUVSRThVQ==
+
dependencies:
+
symbol-observable "^1.2.0"
+
+
callbag-iterate@^1.0.0:
+
version "1.0.0"
+
resolved "https://registry.yarnpkg.com/callbag-iterate/-/callbag-iterate-1.0.0.tgz#97116f09296ef2d5073b35125891dca93349aeeb"
+
integrity sha512-bynCbDuqGZkj1mXAhGr8jMf8Vhifps+G+pF3xlcz3jcaZLNXHghVjValnJtBTg2N74cyl347UzagJ8niJpyF6Q==
+
caller-callsite@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/caller-callsite/-/caller-callsite-2.0.0.tgz#847e0fce0a223750a9a027c54b33731ad3154134"
···
semver "^5.1.0"
strip-ansi "^5.0.0"
strip-bom "^3.0.0"
+
+
zen-observable@^0.8.14:
+
version "0.8.14"
+
resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.8.14.tgz#d33058359d335bc0db1f0af66158b32872af3bf7"
+
integrity sha512-kQz39uonEjEESwh+qCi83kcC3rZJGh4mrZW7xjkSQYXkq//JZHTtKo+6yuVloTgMtzsIWOJrjIrKvk/dqm0L5g==
zwitch@^1.0.0:
version "1.0.4"