1open Wonka_types;
2open Wonka_helpers;
3
4type combineStateT('a, 'b) = {
5 mutable talkbackA: (. talkbackT) => unit,
6 mutable talkbackB: (. talkbackT) => unit,
7 mutable lastValA: option('a),
8 mutable lastValB: option('b),
9 mutable gotSignal: bool,
10 mutable endCounter: int,
11 mutable ended: bool,
12};
13
14let combine = sourceA =>
15 curry(sourceB =>
16 curry(sink => {
17 let state = {
18 talkbackA: talkbackPlaceholder,
19 talkbackB: talkbackPlaceholder,
20 lastValA: None,
21 lastValB: None,
22 gotSignal: false,
23 endCounter: 0,
24 ended: false,
25 };
26
27 sourceA((. signal) =>
28 switch (signal, state.lastValB) {
29 | (Start(tb), _) => state.talkbackA = tb
30 | (Push(a), None) =>
31 state.lastValA = Some(a);
32 state.gotSignal = false;
33 | (Push(a), Some(b)) when !state.ended =>
34 state.lastValA = Some(a);
35 state.gotSignal = false;
36 sink(. Push((a, b)));
37 | (End, _) when state.endCounter < 1 =>
38 state.endCounter = state.endCounter + 1
39 | (End, _) when !state.ended =>
40 state.ended = true;
41 sink(. End);
42 | _ => ()
43 }
44 );
45
46 sourceB((. signal) =>
47 switch (signal, state.lastValA) {
48 | (Start(tb), _) => state.talkbackB = tb
49 | (Push(b), None) =>
50 state.lastValB = Some(b);
51 state.gotSignal = false;
52 | (Push(b), Some(a)) when !state.ended =>
53 state.lastValB = Some(b);
54 state.gotSignal = false;
55 sink(. Push((a, b)));
56 | (End, _) when state.endCounter < 1 =>
57 state.endCounter = state.endCounter + 1
58 | (End, _) when !state.ended =>
59 state.ended = true;
60 sink(. End);
61 | _ => ()
62 }
63 );
64
65 sink(.
66 Start(
67 (. signal) =>
68 if (!state.ended) {
69 switch (signal) {
70 | Close =>
71 state.ended = true;
72 state.talkbackA(. Close);
73 state.talkbackB(. Close);
74 | Pull when !state.gotSignal =>
75 state.gotSignal = true;
76 state.talkbackA(. signal);
77 state.talkbackB(. signal);
78 | Pull => ()
79 };
80 },
81 ),
82 );
83 })
84 );