1open Wonka_types;
2open Wonka_helpers;
3
4[@genType]
5let fromArray = (arr: array('a)): sourceT('a) =>
6 curry(sink => {
7 let size = Rebel.Array.size(arr);
8 let index = ref(0);
9
10 makeTrampoline(sink, (.) =>
11 if (index^ < size) {
12 let x = Rebel.Array.getUnsafe(arr, index^);
13 index := index^ + 1;
14 Some(x);
15 } else {
16 None;
17 }
18 );
19 });
20
21[@genType]
22let fromList = (ls: list('a)): sourceT('a) =>
23 curry(sink => {
24 let value = ref(ls);
25
26 makeTrampoline(sink, (.) =>
27 switch (value^) {
28 | [x, ...rest] =>
29 value := rest;
30 Some(x);
31 | [] => None
32 }
33 );
34 });
35
36[@genType]
37let fromValue = (x: 'a): sourceT('a) =>
38 curry(sink => {
39 let ended = ref(false);
40
41 sink(.
42 Start(
43 (. signal) =>
44 switch (signal) {
45 | Pull when ! ended^ =>
46 ended := true;
47 sink(. Push(x));
48 sink(. End);
49 | Pull => ()
50 | Close => ended := true
51 },
52 ),
53 );
54 });
55
56type makeStateT = {
57 mutable teardown: (. unit) => unit,
58 mutable ended: bool,
59};
60
61[@genType]
62let make = (f: (. observerT('a)) => teardownT): sourceT('a) =>
63 curry(sink => {
64 let state: makeStateT = {teardown: (.) => (), ended: false};
65
66 sink(.
67 Start(
68 (. signal) =>
69 switch (signal) {
70 | Close when !state.ended =>
71 state.ended = true;
72 state.teardown(.);
73 | _ => ()
74 },
75 ),
76 );
77
78 state.teardown =
79 f(. {
80 next: value =>
81 if (!state.ended) {
82 sink(. Push(value));
83 },
84 complete: () =>
85 if (!state.ended) {
86 state.ended = true;
87 sink(. End);
88 },
89 });
90 });
91
92type subjectState('a) = {
93 mutable sinks: Rebel.Array.t(sinkT('a)),
94 mutable ended: bool,
95};
96
97[@genType]
98let makeSubject = (): subjectT('a) => {
99 let state: subjectState('a) = {
100 sinks: Rebel.Array.makeEmpty(),
101 ended: false,
102 };
103
104 let source = sink => {
105 state.sinks = Rebel.Array.append(state.sinks, sink);
106 sink(.
107 Start(
108 (. signal) =>
109 switch (signal) {
110 | Close =>
111 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink)
112 | _ => ()
113 },
114 ),
115 );
116 };
117
118 let next = value =>
119 if (!state.ended) {
120 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value)));
121 };
122
123 let complete = () =>
124 if (!state.ended) {
125 state.ended = true;
126 Rebel.Array.forEach(state.sinks, sink => sink(. End));
127 };
128
129 {source, next, complete};
130};
131
132[@genType]
133let empty = (sink: sinkT('a)): unit => {
134 let ended = ref(false);
135 sink(.
136 Start(
137 (. signal) => {
138 switch (signal) {
139 | Close => ended := true
140 | _ => ()
141 }
142 },
143 ),
144 );
145 if (! ended^) {
146 sink(. End);
147 };
148};
149
150[@genType]
151let never = (sink: sinkT('a)): unit => {
152 sink(. Start(talkbackPlaceholder));
153};