1open Wonka_types;
2open Wonka_helpers;
3
4let observableSymbol: string = [%raw
5 {|
6 typeof Symbol === 'function'
7 ? Symbol.observable || (Symbol.observable = Symbol('observable'))
8 : '@@observable'
9|}
10];
11
12[@genType.import "../shims/Js.shim"]
13type observableSubscriptionT = {. [@bs.meth] "unsubscribe": unit => unit};
14
15[@genType.import "../shims/Js.shim"]
16type observableObserverT('a) = {
17 .
18 [@bs.meth] "next": 'a => unit,
19 [@bs.meth] "error": Js.Exn.t => unit,
20 [@bs.meth] "complete": unit => unit,
21};
22
23[@genType.import "../shims/Js.shim"]
24type observableT('a) = {
25 .
26 [@bs.meth] "subscribe": observableObserverT('a) => observableSubscriptionT,
27};
28
29type observableFactoryT('a) = (. unit) => observableT('a);
30
31[@bs.get_index]
32external observable_get:
33 (observableT('a), string) => option(observableFactoryT('a)) =
34 "";
35[@bs.get_index]
36external observable_unsafe_get:
37 (observableT('a), string) => observableFactoryT('a) =
38 "";
39[@bs.set_index]
40external observable_set:
41 (observableT('a), string, unit => observableT('a)) => unit =
42 "";
43
44[@genType]
45let fromObservable = (input: observableT('a)): sourceT('a) => {
46 let observable =
47 switch (input->observable_get(observableSymbol)) {
48 | Some(_) => (input->observable_unsafe_get(observableSymbol))(.)
49 | None => input
50 };
51
52 curry(sink => {
53 let observer: observableObserverT('a) =
54 [@bs]
55 {
56 as _;
57 pub next = value => sink(. Push(value));
58 pub complete = () => sink(. End);
59 pub error = _ => ()
60 };
61
62 let subscription = observable##subscribe(observer);
63
64 sink(.
65 Start(
66 (. signal) =>
67 switch (signal) {
68 | Close => subscription##unsubscribe()
69 | _ => ()
70 },
71 ),
72 );
73 });
74};
75
76type observableStateT = {
77 mutable talkback: (. talkbackT) => unit,
78 mutable ended: bool,
79};
80
81[@genType]
82let toObservable = (source: sourceT('a)): observableT('a) => {
83 let observable: observableT('a) =
84 [@bs]
85 {
86 as _;
87 pub subscribe =
88 (observer: observableObserverT('a)): observableSubscriptionT => {
89 let state: observableStateT = {
90 talkback: talkbackPlaceholder,
91 ended: false,
92 };
93
94 source((. signal) =>
95 switch (signal) {
96 | Start(x) =>
97 state.talkback = x;
98 x(. Pull);
99 | Push(x) when !state.ended =>
100 observer##next(x);
101 state.talkback(. Pull);
102 | Push(_) => ()
103 | End =>
104 state.ended = true;
105 observer##complete();
106 }
107 );
108
109 [@bs]
110 {
111 as _;
112 pub unsubscribe = () =>
113 if (!state.ended) {
114 state.ended = true;
115 state.talkback(. Close);
116 }
117 };
118 }
119 };
120
121 observable->observable_set(observableSymbol, () => observable);
122 observable;
123};