1open Wonka_types;
2
3type subjectState('a) = {
4 mutable sinks: Rebel.Array.t(sinkT('a)),
5 mutable ended: bool
6};
7
8let makeSubject = () => {
9 let state: subjectState('a) = {
10 sinks: Rebel.Array.makeEmpty(),
11 ended: false,
12 };
13
14 let source = sink => {
15 state.sinks = Rebel.Array.append(state.sinks, sink);
16 sink(.Start((.signal) => {
17 if (signal === Close) {
18 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
19 }
20 }));
21 };
22
23 let next = value =>
24 if (!state.ended) {
25 Rebel.Array.forEach(state.sinks, sink => sink(.Push(value)));
26 };
27
28 let complete = () =>
29 if (!state.ended) {
30 state.ended = true;
31 Rebel.Array.forEach(state.sinks, sink => sink(.End));
32 };
33
34 { source, next, complete }
35};