1open Wonka_types;
2
3type callbagSignal =
4 | CALLBAG_START /* 0 */
5 | CALLBAG_DATA /* 1 */
6 | CALLBAG_END /* 2 */;
7
8type callbagData('a);
9type callbagTalkback = (. callbagSignal) => unit;
10type callbagT('a) = (. callbagSignal, callbagData('a)) => unit;
11
12external unsafe_getCallbag: callbagData('a) => callbagT('a) = "%identity";
13external unsafe_getTalkback: callbagData('a) => callbagTalkback = "%identity";
14external unsafe_getValue: callbagData('a) => 'a = "%identity";
15external unsafe_wrap: 'any => callbagData('a) = "%identity";
16
17let fromCallbag = callbag =>
18 curry(sink => {
19 let wrappedSink =
20 (. signal, data) =>
21 switch (signal) {
22 | CALLBAG_START =>
23 let talkback = unsafe_getTalkback(data);
24 let wrappedTalkback = (
25 (. talkbackSignal: talkbackT) =>
26 switch (talkbackSignal) {
27 | Pull => talkback(. CALLBAG_DATA)
28 | Close => talkback(. CALLBAG_END)
29 }
30 );
31 sink(. Start(wrappedTalkback));
32 | CALLBAG_DATA => sink(. Push(unsafe_getValue(data)))
33 | CALLBAG_END => sink(. End)
34 };
35 callbag(. CALLBAG_START, unsafe_wrap(wrappedSink));
36 });
37
38let toCallbag = source =>
39 curry((. signal, data) =>
40 if (signal === CALLBAG_START) {
41 let callbag = unsafe_getCallbag(data);
42 source((. signal) =>
43 switch (signal) {
44 | Start(talkbackFn) =>
45 let wrappedTalkbackFn = (talkback: callbagSignal) =>
46 switch (talkback) {
47 | CALLBAG_START => ()
48 | CALLBAG_DATA => talkbackFn(. Pull)
49 | CALLBAG_END => talkbackFn(. Close)
50 };
51 callbag(. CALLBAG_START, unsafe_wrap(wrappedTalkbackFn));
52 | Push(data) => callbag(. CALLBAG_DATA, unsafe_wrap(data))
53 | End => callbag(. CALLBAG_END, unsafe_wrap())
54 }
55 );
56 }
57 );