1open Wonka_types;
2open Wonka_helpers;
3
4type shareStateT('a) = {
5 mutable sinks: Rebel.Array.t(sinkT('a)),
6 mutable talkback: (. talkbackT) => unit,
7 mutable gotSignal: bool,
8};
9
10let share = source => {
11 let state = {
12 sinks: Rebel.Array.makeEmpty(),
13 talkback: talkbackPlaceholder,
14 gotSignal: false,
15 };
16
17 sink => {
18 state.sinks = Rebel.Array.append(state.sinks, sink);
19
20 if (Rebel.Array.size(state.sinks) === 1) {
21 source((. signal) =>
22 switch (signal) {
23 | Push(_) =>
24 state.gotSignal = false;
25 Rebel.Array.forEach(state.sinks, sink => sink(. signal));
26 | Start(x) => state.talkback = x
27 | End =>
28 Rebel.Array.forEach(state.sinks, sink => sink(. End));
29 state.sinks = Rebel.Array.makeEmpty();
30 }
31 );
32 };
33
34 sink(.
35 Start(
36 (. signal) =>
37 switch (signal) {
38 | Close =>
39 state.sinks = Rebel.Array.filter(state.sinks, x => x !== sink);
40 if (Rebel.Array.size(state.sinks) === 0) {
41 state.talkback(. Close);
42 };
43 | Pull when !state.gotSignal =>
44 state.gotSignal = true;
45 state.talkback(. signal);
46 | Pull => ()
47 },
48 ),
49 );
50 };
51};