1open Wonka_types;
2open Wonka_helpers;
3
4type combineStateT('a, 'b) = {
5 mutable talkbackA: (.talkbackT) => unit,
6 mutable talkbackB: (.talkbackT) => unit,
7 mutable lastValA: option('a),
8 mutable lastValB: option('b),
9 mutable gotSignal: bool,
10 mutable endCounter: int,
11 mutable ended: bool,
12};
13
14let combine = sourceA => curry(sourceB => curry(sink => {
15 let state = {
16 talkbackA: talkbackPlaceholder,
17 talkbackB: talkbackPlaceholder,
18 lastValA: None,
19 lastValB: None,
20 gotSignal: false,
21 endCounter: 0,
22 ended: false
23 };
24
25 sourceA((.signal) => {
26 switch (signal, state.lastValB) {
27 | (Start(tb), _) => state.talkbackA = tb
28 | (Push(a), None) => {
29 state.lastValA = Some(a);
30 state.gotSignal = false;
31 }
32 | (Push(a), Some(b)) when !state.ended => {
33 state.lastValA = Some(a);
34 state.gotSignal = false;
35 sink(.Push((a, b)));
36 }
37 | (End, _) when state.endCounter < 1 =>
38 state.endCounter = state.endCounter + 1
39 | (End, _) when !state.ended => {
40 state.ended = true;
41 sink(.End);
42 }
43 | _ => ()
44 }
45 });
46
47 sourceB((.signal) => {
48 switch (signal, state.lastValA) {
49 | (Start(tb), _) => state.talkbackB = tb
50 | (Push(b), None) => {
51 state.lastValB = Some(b);
52 state.gotSignal = false;
53 }
54 | (Push(b), Some(a)) when !state.ended => {
55 state.lastValB = Some(b);
56 state.gotSignal = false;
57 sink(.Push((a, b)));
58 }
59 | (End, _) when state.endCounter < 1 =>
60 state.endCounter = state.endCounter + 1
61 | (End, _) when !state.ended => {
62 state.ended = true;
63 sink(.End);
64 }
65 | _ => ()
66 }
67 });
68
69 sink(.Start((.signal) => {
70 if (!state.ended) {
71 switch (signal) {
72 | Close => {
73 state.ended = true;
74 state.talkbackA(.Close);
75 state.talkbackB(.Close);
76 }
77 | Pull when !state.gotSignal => {
78 state.gotSignal = true;
79 state.talkbackA(.signal);
80 state.talkbackB(.signal);
81 }
82 | Pull => ()
83 }
84 };
85 }));
86}));