1open Wonka_types;
2open Wonka_helpers;
3
4type publishStateT = {
5 mutable talkback: (.talkbackT) => unit,
6 mutable ended: bool
7};
8
9let publish = source => {
10 let state: publishStateT = {
11 talkback: talkbackPlaceholder,
12 ended: false
13 };
14
15 source((.signal) => {
16 switch (signal) {
17 | Start(x) => {
18 state.talkback = x;
19 x(.Pull);
20 }
21 | Push(_) => if (!state.ended) state.talkback(.Pull);
22 | End => state.ended = true;
23 }
24 });
25
26 {
27 unsubscribe: () =>
28 if (!state.ended) {
29 state.ended = true;
30 state.talkback(.Close);
31 }
32 }
33};