1open Wonka_types;
2open Wonka_helpers;
3
4type subscribeStateT = {
5 mutable talkback: (.talkbackT) => unit,
6 mutable ended: bool
7};
8
9let subscribe = f => curry(source => {
10 let state: subscribeStateT = {
11 talkback: talkbackPlaceholder,
12 ended: false
13 };
14
15 source((.signal) => {
16 switch (signal) {
17 | Start(x) => {
18 state.talkback = x;
19 x(.Pull);
20 }
21 | Push(x) when !state.ended => {
22 f(.x);
23 state.talkback(.Pull);
24 }
25 | Push(_) => ()
26 | End => state.ended = true;
27 }
28 });
29
30 {
31 unsubscribe: () =>
32 if (!state.ended) {
33 state.ended = true;
34 state.talkback(.Close);
35 }
36 }
37});
38
39let forEach = f => curry(source => {
40 ignore(subscribe(f, source));
41});