1open Wonka_types;
2open Wonka_helpers;
3
4type publishStateT = {
5 mutable talkback: (. talkbackT) => unit,
6 mutable ended: bool,
7};
8
9let publish = source => {
10 let state: publishStateT = {talkback: talkbackPlaceholder, ended: false};
11
12 source((. signal) =>
13 switch (signal) {
14 | Start(x) =>
15 state.talkback = x;
16 x(. Pull);
17 | Push(_) =>
18 if (!state.ended) {
19 state.talkback(. Pull);
20 }
21 | End => state.ended = true
22 }
23 );
24
25 {
26 unsubscribe: () =>
27 if (!state.ended) {
28 state.ended = true;
29 state.talkback(. Close);
30 },
31 };
32};