1open Wonka_types;
2open Wonka_helpers;
3
4type shareStateT('a) = {
5 mutable sinks: Rebel.Array.t(sinkT('a)),
6 mutable talkback: (.talkbackT) => unit,
7 mutable gotSignal: bool
8};
9
10let share = source => {
11 let state = {
12 sinks: Rebel.Array.makeEmpty(),
13 talkback: talkbackPlaceholder,
14 gotSignal: false
15 };
16
17 sink => {
18 state.sinks = Rebel.Array.append(state.sinks, sink);
19
20 if (Rebel.Array.size(state.sinks) === 1) {
21 source((.signal) => {
22 switch (signal) {
23 | Push(_) => {
24 state.gotSignal = false;
25 Rebel.Array.forEach(state.sinks, sink => sink(.signal));
26 }
27 | Start(x) => state.talkback = x
28 | End => {
29 Rebel.Array.forEach(state.sinks, sink => sink(.End));
30 state.sinks = Rebel.Array.makeEmpty();
31 }
32 }
33 });
34 };
35
36 sink(.Start((.signal) => {
37 switch (signal) {
38 | Close => {
39 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
40 if (Rebel.Array.size(state.sinks) === 0) {
41 state.talkback(.Close);
42 };
43 }
44 | Pull when !state.gotSignal => {
45 state.gotSignal = true;
46 state.talkback(.signal);
47 }
48 | Pull => ()
49 }
50 }));
51 }
52};