1open Wonka_types;
2
3[@genType]
4let fromObservable = Wonka_observable.fromObservable;
5[@genType]
6let toObservable = Wonka_observable.toObservable;
7
8[@genType]
9let fromCallbag = Wonka_callbag.fromCallbag;
10[@genType]
11let toCallbag = Wonka_callbag.toCallbag;
12
13/* operators */
14
15[@genType]
16let debounce = (f: (. 'a) => int): operatorT('a, 'a) =>
17 curry(source =>
18 curry(sink => {
19 let gotEndSignal = ref(false);
20 let id: ref(option(Js.Global.timeoutId)) = ref(None);
21
22 let clearTimeout = () =>
23 switch (id^) {
24 | Some(timeoutId) =>
25 id := None;
26 Js.Global.clearTimeout(timeoutId);
27 | None => ()
28 };
29
30 source((. signal) =>
31 switch (signal) {
32 | Start(tb) =>
33 sink(.
34 Start(
35 (. signal) =>
36 switch (signal) {
37 | Close =>
38 clearTimeout();
39 tb(. Close);
40 | _ => tb(. signal)
41 },
42 ),
43 )
44 | Push(x) =>
45 clearTimeout();
46 id :=
47 Some(
48 Js.Global.setTimeout(
49 () => {
50 id := None;
51 sink(. signal);
52 if (gotEndSignal^) {
53 sink(. End);
54 };
55 },
56 f(. x),
57 ),
58 );
59 | End =>
60 gotEndSignal := true;
61
62 switch (id^) {
63 | None => sink(. End)
64 | _ => ()
65 };
66 }
67 );
68 })
69 );
70
71type delayStateT = {
72 mutable talkback: (. talkbackT) => unit,
73 mutable active: int,
74 mutable gotEndSignal: bool,
75};
76
77[@genType]
78let delay = (wait: int): operatorT('a, 'a) =>
79 curry(source =>
80 curry(sink => {
81 let state: delayStateT = {
82 talkback: Wonka_helpers.talkbackPlaceholder,
83 active: 0,
84 gotEndSignal: false,
85 };
86
87 source((. signal) =>
88 switch (signal) {
89 | Start(tb) => state.talkback = tb
90 | _ when !state.gotEndSignal =>
91 state.active = state.active + 1;
92 ignore(
93 Js.Global.setTimeout(
94 () => {
95 if (state.gotEndSignal && state.active === 0) {
96 sink(. End);
97 } else {
98 state.active = state.active - 1;
99 };
100
101 sink(. signal);
102 },
103 wait,
104 ),
105 );
106 | _ => ()
107 }
108 );
109
110 sink(.
111 Start(
112 (. signal) =>
113 switch (signal) {
114 | Close =>
115 state.gotEndSignal = true;
116 if (state.active === 0) {
117 sink(. End);
118 };
119 | _ when !state.gotEndSignal => state.talkback(. signal)
120 | _ => ()
121 },
122 ),
123 );
124 })
125 );
126
127[@genType]
128let throttle = (f: (. 'a) => int): operatorT('a, 'a) =>
129 curry(source =>
130 curry(sink => {
131 let skip = ref(false);
132 let id: ref(option(Js.Global.timeoutId)) = ref(None);
133 let clearTimeout = () =>
134 switch (id^) {
135 | Some(timeoutId) => Js.Global.clearTimeout(timeoutId)
136 | None => ()
137 };
138
139 source((. signal) =>
140 switch (signal) {
141 | Start(tb) =>
142 sink(.
143 Start(
144 (. signal) =>
145 switch (signal) {
146 | Close =>
147 clearTimeout();
148 tb(. Close);
149 | _ => tb(. signal)
150 },
151 ),
152 )
153 | End =>
154 clearTimeout();
155 sink(. End);
156 | Push(x) when ! skip^ =>
157 skip := true;
158 clearTimeout();
159 id :=
160 Some(
161 Js.Global.setTimeout(
162 () => {
163 id := None;
164 skip := false;
165 },
166 f(. x),
167 ),
168 );
169 sink(. signal);
170 | Push(_) => ()
171 }
172 );
173 })
174 );
175
176/* sinks */
177[@genType]
178let toPromise = (source: sourceT('a)): Js.Promise.t('a) => {
179 Js.Promise.make((~resolve, ~reject as _) =>
180 Wonka_operators.takeLast(1, source, (. signal) =>
181 switch (signal) {
182 | Start(x) => x(. Pull)
183 | Push(x) => resolve(. x)
184 | End => ()
185 }
186 )
187 );
188};
189
190/* sources */
191[@genType]
192let interval = (p: int): sourceT(int) =>
193 curry(sink => {
194 let i = ref(0);
195 let id =
196 Js.Global.setInterval(
197 () => {
198 let num = i^;
199 i := i^ + 1;
200 sink(. Push(num));
201 },
202 p,
203 );
204
205 sink(.
206 Start(
207 (. signal) =>
208 switch (signal) {
209 | Close => Js.Global.clearInterval(id)
210 | _ => ()
211 },
212 ),
213 );
214 });
215
216[@genType]
217let fromDomEvent = (element: Dom.element, event: string): sourceT(Dom.event) =>
218 curry(sink => {
219 let addEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw
220 {|
221 function (element, event, handler) {
222 element.addEventListener(event, handler);
223 }
224 |}
225 ];
226
227 let removeEventListener: (Dom.element, string, Dom.event => unit) => unit = [%raw
228 {|
229 function (element, event, handler) {
230 element.removeEventListener(event, handler);
231 }
232 |}
233 ];
234
235 let handler = event => sink(. Push(event));
236
237 sink(.
238 Start(
239 (. signal) =>
240 switch (signal) {
241 | Close => removeEventListener(element, event, handler)
242 | _ => ()
243 },
244 ),
245 );
246
247 addEventListener(element, event, handler);
248 });
249
250[@genType]
251let fromPromise = (promise: Js.Promise.t('a)): sourceT('a) =>
252 curry(sink => {
253 let ended = ref(false);
254
255 ignore(
256 Js.Promise.then_(
257 value => {
258 if (! ended^) {
259 sink(. Push(value));
260 sink(. End);
261 };
262
263 Js.Promise.resolve();
264 },
265 promise,
266 ),
267 );
268
269 sink(.
270 Start(
271 (. signal) =>
272 switch (signal) {
273 | Close => ended := true
274 | _ => ()
275 },
276 ),
277 );
278 });