1open Wonka_types;
2
3[@genType]
4let fromObservable = Wonka_observable.fromObservable;
5[@genType]
6let toObservable = Wonka_observable.toObservable;
7
8[@genType]
9let fromCallbag = Wonka_callbag.fromCallbag;
10[@genType]
11let toCallbag = Wonka_callbag.toCallbag;
12
13/* operators */
14
15type debounceStateT = {
16 mutable id: option(Js.Global.timeoutId),
17 mutable deferredEnded: bool,
18 mutable ended: bool,
19};
20
21[@genType]
22let debounce = (f: (. 'a) => int): operatorT('a, 'a) =>
23 curry(source =>
24 curry(sink => {
25 let state: debounceStateT = {
26 id: None,
27 deferredEnded: false,
28 ended: false,
29 };
30
31 let clearTimeout = () =>
32 switch (state.id) {
33 | Some(timeoutId) =>
34 state.id = None;
35 Js.Global.clearTimeout(timeoutId);
36 | None => ()
37 };
38
39 source((. signal) =>
40 switch (signal) {
41 | Start(tb) =>
42 sink(.
43 Start(
44 (. signal) =>
45 if (!state.ended) {
46 switch (signal) {
47 | Close =>
48 state.ended = true;
49 state.deferredEnded = false;
50 clearTimeout();
51 tb(. Close);
52 | Pull => tb(. Pull)
53 };
54 },
55 ),
56 )
57 | Push(x) when !state.ended =>
58 clearTimeout();
59 state.id =
60 Some(
61 Js.Global.setTimeout(
62 () => {
63 state.id = None;
64 sink(. signal);
65 if (state.deferredEnded) {
66 sink(. End);
67 };
68 },
69 f(. x),
70 ),
71 );
72 | Push(_) => ()
73 | End when !state.ended =>
74 state.ended = true;
75 switch (state.id) {
76 | Some(_) => state.deferredEnded = true
77 | None => sink(. End)
78 };
79 | End => ()
80 }
81 );
82 })
83 );
84
85[@genType]
86let delay = (wait: int): operatorT('a, 'a) =>
87 curry(source =>
88 curry(sink => {
89 let active = ref(0);
90
91 source((. signal) =>
92 switch (signal) {
93 | Start(_) => sink(. signal)
94 | _ =>
95 active := active^ + 1;
96 ignore(
97 Js.Global.setTimeout(
98 () =>
99 if (active^ !== 0) {
100 active := active^ - 1;
101 sink(. signal);
102 },
103 wait,
104 ),
105 );
106 }
107 );
108 })
109 );
110
111[@genType]
112let throttle = (f: (. 'a) => int): operatorT('a, 'a) =>
113 curry(source =>
114 curry(sink => {
115 let skip = ref(false);
116 let id: ref(option(Js.Global.timeoutId)) = ref(None);
117 let clearTimeout = () =>
118 switch (id^) {
119 | Some(timeoutId) => Js.Global.clearTimeout(timeoutId)
120 | None => ()
121 };
122
123 source((. signal) =>
124 switch (signal) {
125 | Start(tb) =>
126 sink(.
127 Start(
128 (. signal) =>
129 switch (signal) {
130 | Close =>
131 clearTimeout();
132 tb(. Close);
133 | _ => tb(. signal)
134 },
135 ),
136 )
137 | End =>
138 clearTimeout();
139 sink(. End);
140 | Push(x) when ! skip^ =>
141 skip := true;
142 clearTimeout();
143 id :=
144 Some(
145 Js.Global.setTimeout(
146 () => {
147 id := None;
148 skip := false;
149 },
150 f(. x),
151 ),
152 );
153 sink(. signal);
154 | Push(_) => ()
155 }
156 );
157 })
158 );
159
160/* sinks */
161[@genType]
162let toPromise = (source: sourceT('a)): Js.Promise.t('a) => {
163 Js.Promise.make((~resolve, ~reject as _) =>
164 Wonka_operators.takeLast(1, source, (. signal) =>
165 switch (signal) {
166 | Start(x) => x(. Pull)
167 | Push(x) => resolve(. x)
168 | End => ()
169 }
170 )
171 );
172};
173
174/* sources */
175[@genType]
176let interval = (p: int): sourceT(int) =>
177 curry(sink => {
178 let i = ref(0);
179 let id =
180 Js.Global.setInterval(
181 () => {
182 let num = i^;
183 i := i^ + 1;
184 sink(. Push(num));
185 },
186 p,
187 );
188
189 sink(.
190 Start(
191 (. signal) =>
192 switch (signal) {
193 | Close => Js.Global.clearInterval(id)
194 | _ => ()
195 },
196 ),
197 );
198 });
199
200[@genType]
201let fromDomEvent = (element: Dom.element, event: string): sourceT(Dom.event) =>
202 curry(sink => {
203 let addEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw
204 {|
205 function (element, event, handler) {
206 element.addEventListener(event, handler);
207 }
208 |}
209 ];
210
211 let removeEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw
212 {|
213 function (element, event, handler) {
214 element.removeEventListener(event, handler);
215 }
216 |}
217 ];
218
219 let handler = event => sink(. Push(event));
220
221 sink(.
222 Start(
223 (. signal) =>
224 switch (signal) {
225 | Close => removeEventListener(element, event, handler)
226 | _ => ()
227 },
228 ),
229 );
230
231 addEventListener(element, event, handler);
232 });
233
234[@genType]
235let fromPromise = (promise: Js.Promise.t('a)): sourceT('a) =>
236 curry(sink => {
237 let ended = ref(false);
238
239 ignore(
240 Js.Promise.then_(
241 value => {
242 if (! ended^) {
243 sink(. Push(value));
244 sink(. End);
245 };
246
247 Js.Promise.resolve();
248 },
249 promise,
250 ),
251 );
252
253 sink(.
254 Start(
255 (. signal) =>
256 switch (signal) {
257 | Close => ended := true
258 | _ => ()
259 },
260 ),
261 );
262 });