1open Wonka_types;
2open Wonka_helpers;
3
4type trampolineT('a) = {
5 mutable ended: bool,
6 mutable looping: bool,
7 mutable pulled: bool,
8 mutable current: 'a,
9};
10
11[@genType]
12let fromArray = (arr: array('a)): sourceT('a) =>
13 curry(sink => {
14 let size = Rebel.Array.size(arr);
15 let state = {ended: false, looping: false, pulled: false, current: 0};
16
17 sink(.
18 Start(
19 (. signal) =>
20 switch (signal, state.looping) {
21 | (Pull, false) =>
22 state.pulled = true;
23 state.looping = true;
24
25 while (state.pulled && !state.ended) {
26 if (state.current < size) {
27 let x = Rebel.Array.getUnsafe(arr, state.current);
28 state.current = state.current + 1;
29 state.pulled = false;
30 sink(. Push(x));
31 } else {
32 state.ended = true;
33 sink(. End);
34 };
35 };
36
37 state.looping = false;
38 | (Pull, true) => state.pulled = true
39 | (Close, _) => state.ended = true
40 },
41 ),
42 );
43 });
44
45[@genType]
46let fromList = (ls: list('a)): sourceT('a) =>
47 curry(sink => {
48 let state = {ended: false, looping: false, pulled: false, current: ls};
49
50 sink(.
51 Start(
52 (. signal) =>
53 switch (signal, state.looping) {
54 | (Pull, false) =>
55 state.pulled = true;
56 state.looping = true;
57
58 while (state.pulled && !state.ended) {
59 switch (state.current) {
60 | [x, ...rest] =>
61 state.current = rest;
62 state.pulled = false;
63 sink(. Push(x));
64 | [] =>
65 state.ended = true;
66 sink(. End);
67 };
68 };
69
70 state.looping = false;
71 | (Pull, true) => state.pulled = true
72 | (Close, _) => state.ended = true
73 },
74 ),
75 );
76 });
77
78[@genType]
79let fromValue = (x: 'a): sourceT('a) =>
80 curry(sink => {
81 let ended = ref(false);
82
83 sink(.
84 Start(
85 (. signal) =>
86 switch (signal) {
87 | Pull when ! ended^ =>
88 ended := true;
89 sink(. Push(x));
90 sink(. End);
91 | Pull => ()
92 | Close => ended := true
93 },
94 ),
95 );
96 });
97
98type makeStateT = {
99 mutable teardown: (. unit) => unit,
100 mutable ended: bool,
101};
102
103[@genType]
104let make = (f: (. observerT('a)) => teardownT): sourceT('a) =>
105 curry(sink => {
106 let state: makeStateT = {teardown: (.) => (), ended: false};
107
108 state.teardown =
109 f(. {
110 next: value =>
111 if (!state.ended) {
112 sink(. Push(value));
113 },
114 complete: () =>
115 if (!state.ended) {
116 state.ended = true;
117 sink(. End);
118 },
119 });
120
121 sink(.
122 Start(
123 (. signal) =>
124 switch (signal) {
125 | Close when !state.ended =>
126 state.ended = true;
127 state.teardown(.);
128 | _ => ()
129 },
130 ),
131 );
132 });
133
134type subjectState('a) = {
135 mutable sinks: Rebel.Array.t(sinkT('a)),
136 mutable ended: bool,
137};
138
139[@genType]
140let makeSubject = (): subjectT('a) => {
141 let state: subjectState('a) = {
142 sinks: Rebel.Array.makeEmpty(),
143 ended: false,
144 };
145
146 let source = sink => {
147 state.sinks = Rebel.Array.append(state.sinks, sink);
148 sink(.
149 Start(
150 (. signal) =>
151 switch (signal) {
152 | Close =>
153 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink)
154 | _ => ()
155 },
156 ),
157 );
158 };
159
160 let next = value =>
161 if (!state.ended) {
162 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value)));
163 };
164
165 let complete = () =>
166 if (!state.ended) {
167 state.ended = true;
168 Rebel.Array.forEach(state.sinks, sink => sink(. End));
169 };
170
171 {source, next, complete};
172};
173
174[@genType]
175let empty = (sink: sinkT('a)): unit => {
176 let ended = ref(false);
177 sink(.
178 Start(
179 (. signal) => {
180 switch (signal) {
181 | Close => ended := true
182 | Pull when ! ended^ => sink(. End)
183 | _ => ()
184 }
185 },
186 ),
187 );
188};
189
190[@genType]
191let never = (sink: sinkT('a)): unit => {
192 sink(. Start(talkbackPlaceholder));
193};