1open Wonka_types;
2open Wonka_helpers;
3
4type subscribeStateT = {
5 mutable talkback: (. talkbackT) => unit,
6 mutable ended: bool,
7};
8
9[@genType]
10type subscribeConsumerT('a) = sourceT('a) => subscriptionT;
11
12[@genType]
13let subscribe = (f: (. 'a) => unit): subscribeConsumerT('a) =>
14 curry(source => {
15 let state: subscribeStateT = {
16 talkback: talkbackPlaceholder,
17 ended: false,
18 };
19
20 source((. signal) =>
21 switch (signal) {
22 | Start(x) =>
23 state.talkback = x;
24 x(. Pull);
25 | Push(x) when !state.ended =>
26 f(. x);
27 state.talkback(. Pull);
28 | Push(_) => ()
29 | End => state.ended = true
30 }
31 );
32
33 {
34 unsubscribe: () =>
35 if (!state.ended) {
36 state.ended = true;
37 state.talkback(. Close);
38 },
39 };
40 });
41
42[@genType]
43type forEachConsumerT('a) = sourceT('a) => unit;
44
45[@genType]
46let forEach = (f: (. 'a) => unit): forEachConsumerT('a) =>
47 curry(source => ignore(subscribe(f, source)));
48
49[@genType]
50let publish = (source: sourceT('a)): subscriptionT =>
51 subscribe((. _) => (), source);
52
53type toArrayStateT('a) = {
54 values: Rebel.MutableQueue.t('a),
55 mutable talkback: (. talkbackT) => unit,
56 mutable value: option('a),
57 mutable ended: bool,
58};
59
60[@genType]
61let toArray = (source: sourceT('a)): array('a) => {
62 let state: toArrayStateT('a) = {
63 values: Rebel.MutableQueue.make(),
64 talkback: talkbackPlaceholder,
65 value: None,
66 ended: false,
67 };
68
69 source((. signal) =>
70 switch (signal) {
71 | Start(x) =>
72 state.talkback = x;
73 x(. Pull);
74 | Push(value) =>
75 Rebel.MutableQueue.add(state.values, value);
76 state.talkback(. Pull);
77 | End => state.ended = true
78 }
79 );
80
81 if (!state.ended) {
82 state.talkback(. Close);
83 };
84
85 Rebel.MutableQueue.toArray(state.values);
86};