1open Wonka_types;
2open Wonka_helpers;
3
4type bufferStateT('a) = {
5 mutable buffer: Rebel.MutableQueue.t('a),
6 mutable sourceTalkback: (. talkbackT) => unit,
7 mutable notifierTalkback: (. talkbackT) => unit,
8 mutable ended: bool,
9};
10
11let buffer = (notifier: sourceT('a)) =>
12 curry((source: sourceT('b)) =>
13 curry((sink: sinkT(array('b))) => {
14 let state: bufferStateT('b) = {
15 buffer: Rebel.MutableQueue.make(),
16 sourceTalkback: talkbackPlaceholder,
17 notifierTalkback: talkbackPlaceholder,
18 ended: false,
19 };
20
21 source((. signal) =>
22 switch (signal) {
23 | Start(tb) =>
24 state.sourceTalkback = tb;
25
26 notifier((. signal) =>
27 switch (signal) {
28 | Start(tb) =>
29 state.notifierTalkback = tb;
30 state.notifierTalkback(. Pull);
31 | Push(_) when !state.ended =>
32 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
33 state.buffer = Rebel.MutableQueue.make();
34 state.notifierTalkback(. Pull);
35 | Push(_) => ()
36 | End when !state.ended =>
37 state.ended = true;
38 state.sourceTalkback(. Close);
39 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
40 sink(. End);
41 | End => ()
42 }
43 );
44 | Push(value) when !state.ended =>
45 Rebel.MutableQueue.add(state.buffer, value);
46 state.sourceTalkback(. Pull);
47 | Push(_) => ()
48 | End when !state.ended =>
49 state.ended = true;
50 state.notifierTalkback(. Close);
51 sink(. Push(Rebel.MutableQueue.toArray(state.buffer)));
52 sink(. End);
53 | End => ()
54 }
55 );
56
57 sink(.
58 Start(
59 (. signal) =>
60 if (!state.ended) {
61 switch (signal) {
62 | Close =>
63 state.ended = true;
64 state.sourceTalkback(. Close);
65 state.notifierTalkback(. Close);
66 | Pull => state.sourceTalkback(. Pull)
67 };
68 },
69 ),
70 );
71 })
72 );