1open Wonka_types;
2
3type subjectState('a) = {
4 mutable sinks: Rebel.Array.t(sinkT('a)),
5 mutable ended: bool,
6};
7
8let makeSubject = () => {
9 let state: subjectState('a) = {
10 sinks: Rebel.Array.makeEmpty(),
11 ended: false,
12 };
13
14 let source = sink => {
15 state.sinks = Rebel.Array.append(state.sinks, sink);
16 sink(.
17 Start(
18 (. signal) =>
19 if (signal === Close) {
20 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
21 },
22 ),
23 );
24 };
25
26 let next = value =>
27 if (!state.ended) {
28 Rebel.Array.forEach(state.sinks, sink => sink(. Push(value)));
29 };
30
31 let complete = () =>
32 if (!state.ended) {
33 state.ended = true;
34 Rebel.Array.forEach(state.sinks, sink => sink(. End));
35 };
36
37 {source, next, complete};
38};