1open Wonka_types;
2open Wonka_helpers;
3
4let observableSymbol: string = [%raw
5 {|
6 typeof Symbol === 'function'
7 ? Symbol.observable || (Symbol.observable = Symbol('observable'))
8 : '@@observable'
9|}
10];
11
12[@genType.import "../shims/Js.shim"]
13type observableSubscriptionT = {. [@bs.meth] "unsubscribe": unit => unit};
14
15[@bs.set_index]
16external subscription_set: (observableSubscriptionT, string, bool) => unit;
17
18[@genType.import "../shims/Js.shim"]
19type observableObserverT('a) = {
20 .
21 [@bs.meth] "next": 'a => unit,
22 [@bs.meth] "error": Js.Exn.t => unit,
23 [@bs.meth] "complete": unit => unit,
24};
25
26[@genType.import "../shims/Js.shim"]
27type observableT('a) = {
28 .
29 [@bs.meth] "subscribe": observableObserverT('a) => observableSubscriptionT,
30};
31
32type observableFactoryT('a) = (. unit) => observableT('a);
33
34[@bs.get_index]
35external observable_get:
36 (observableT('a), string) => option(observableFactoryT('a));
37[@bs.get_index]
38external observable_unsafe_get:
39 (observableT('a), string) => observableFactoryT('a);
40[@bs.set_index]
41external observable_set:
42 (observableT('a), string, unit => observableT('a)) => unit;
43
44[@genType]
45let fromObservable = (input: observableT('a)): sourceT('a) => {
46 let observable =
47 switch (input->observable_get(observableSymbol)) {
48 | Some(_) => (input->observable_unsafe_get(observableSymbol))(.)
49 | None => input
50 };
51
52 curry(sink => {
53 let observer: observableObserverT('a) =
54 [@bs]
55 {
56 as _;
57 pub next = value => sink(. Push(value));
58 pub complete = () => sink(. End);
59 pub error = _ => ()
60 };
61
62 let subscription = observable##subscribe(observer);
63
64 sink(.
65 Start(
66 (. signal) =>
67 switch (signal) {
68 | Close => subscription##unsubscribe()
69 | _ => ()
70 },
71 ),
72 );
73 });
74};
75
76type observableStateT = {
77 mutable talkback: (. talkbackT) => unit,
78 mutable ended: bool,
79};
80
81[@genType]
82let toObservable = (source: sourceT('a)): observableT('a) => {
83 let observable: observableT('a) =
84 [@bs]
85 {
86 as _;
87 pub subscribe =
88 (_observer: observableObserverT('a)): observableSubscriptionT => {
89 let next: (. 'a) => unit = [%raw
90 {|
91 (typeof _observer === 'object' ? _observer.next.bind(_observer) : _observer) || function () {}
92 |}
93 ];
94
95 let complete: (. unit) => unit = [%raw
96 {|
97 (typeof _observer === 'object' ? _observer.complete.bind(_observer) : arguments[2]) || function () {}
98 |}
99 ];
100
101 let state: observableStateT = {
102 talkback: talkbackPlaceholder,
103 ended: false,
104 };
105
106 source((. signal) =>
107 switch (signal) {
108 | Start(x) =>
109 state.talkback = x;
110 x(. Pull);
111 | Push(x) when !state.ended =>
112 next(. x);
113 state.talkback(. Pull);
114 | Push(_) => ()
115 | End =>
116 state.ended = true;
117 complete(.);
118 }
119 );
120
121 let subscription =
122 [@bs]
123 {
124 as self;
125 pub unsubscribe = () =>
126 if (!state.ended) {
127 self->subscription_set("closed", false);
128 state.ended = true;
129 state.talkback(. Close);
130 }
131 };
132
133 subscription->subscription_set("closed", false);
134 subscription;
135 }
136 };
137
138 observable->observable_set(observableSymbol, () => observable);
139 observable;
140};