1open Wonka_types;
2open Wonka_helpers;
3
4let observableSymbol: string = [%raw
5 {|
6 typeof Symbol === 'function'
7 ? Symbol.observable || (Symbol.observable = Symbol('observable'))
8 : '@@observable'
9|}
10];
11
12type subscriptionT = {. [@bs.meth] "unsubscribe": unit => unit};
13
14type observerT('a) = {
15 .
16 [@bs.meth] "next": 'a => unit,
17 [@bs.meth] "error": Js.Exn.t => unit,
18 [@bs.meth] "complete": unit => unit,
19};
20
21type observableT('a) = {
22 .
23 [@bs.meth] "subscribe": observerT('a) => subscriptionT,
24};
25
26type observableFactoryT('a) = (. unit) => observableT('a);
27
28[@bs.get_index]
29external observable_get:
30 (observableT('a), string) => option(observableFactoryT('a)) =
31 "";
32[@bs.get_index]
33external observable_unsafe_get:
34 (observableT('a), string) => observableFactoryT('a) =
35 "";
36[@bs.set_index]
37external observable_set:
38 (observableT('a), string, unit => observableT('a)) => unit =
39 "";
40
41let fromObservable = (input: observableT('a)): sourceT('a) => {
42 let observable =
43 switch (input->observable_get(observableSymbol)) {
44 | Some(_) => (input->observable_unsafe_get(observableSymbol))(.)
45 | None => input
46 };
47
48 curry(sink => {
49 let observer: observerT('a) =
50 [@bs]
51 {
52 as _;
53 pub next = value => sink(. Push(value));
54 pub complete = () => sink(. End);
55 pub error = _ => ()
56 };
57
58 let subscription = observable##subscribe(observer);
59
60 sink(.
61 Start(
62 (. signal) =>
63 switch (signal) {
64 | Close => subscription##unsubscribe()
65 | _ => ()
66 },
67 ),
68 );
69 });
70};
71
72type observableStateT = {
73 mutable talkback: (. talkbackT) => unit,
74 mutable ended: bool,
75};
76
77let toObservable = (source: sourceT('a)): observableT('a) => {
78 let observable: observableT('a) =
79 [@bs]
80 {
81 as _;
82 pub subscribe = (observer: observerT('a)): subscriptionT => {
83 let state: observableStateT = {
84 talkback: talkbackPlaceholder,
85 ended: false,
86 };
87
88 source((. signal) =>
89 switch (signal) {
90 | Start(x) =>
91 state.talkback = x;
92 x(. Pull);
93 | Push(x) when !state.ended =>
94 observer##next(x);
95 state.talkback(. Pull);
96 | Push(_) => ()
97 | End =>
98 state.ended = true;
99 observer##complete();
100 }
101 );
102
103 [@bs]
104 {
105 as _;
106 pub unsubscribe = () =>
107 if (!state.ended) {
108 state.ended = true;
109 state.talkback(. Close);
110 }
111 };
112 }
113 };
114
115 observable->observable_set(observableSymbol, () => observable);
116 observable;
117};