1open Wonka_types;
2
3[@genType]
4type callbagSignal =
5 | [@genType.as 0] CALLBAG_START /* 0 */
6 | [@genType.as 1] CALLBAG_DATA /* 1 */
7 | [@genType.as 2] CALLBAG_END /* 2 */;
8
9[@genType]
10type callbagData('a);
11
12[@genType]
13type callbagTalkback = (. callbagSignal) => unit;
14
15[@genType.import "../shims/Js.shim"]
16type callbagT('a) = (callbagSignal, callbagData('a)) => unit;
17
18external unsafe_getCallbag: callbagData('a) => callbagT('a) = "%identity";
19external unsafe_getTalkback: callbagData('a) => callbagTalkback = "%identity";
20external unsafe_getValue: callbagData('a) => 'a = "%identity";
21external unsafe_wrap: 'any => callbagData('a) = "%identity";
22
23[@genType]
24let fromCallbag = (callbag: callbagT('a)): sourceT('a) =>
25 curry(sink => {
26 let wrappedSink =
27 (. signal, data) =>
28 switch (signal) {
29 | CALLBAG_START =>
30 let talkback = unsafe_getTalkback(data);
31 let wrappedTalkback = (
32 (. talkbackSignal: talkbackT) =>
33 switch (talkbackSignal) {
34 | Pull => talkback(. CALLBAG_DATA)
35 | Close => talkback(. CALLBAG_END)
36 }
37 );
38 sink(. Start(wrappedTalkback));
39 | CALLBAG_DATA => sink(. Push(unsafe_getValue(data)))
40 | CALLBAG_END => sink(. End)
41 };
42 callbag(CALLBAG_START, unsafe_wrap(wrappedSink));
43 });
44
45[@genType]
46let toCallbag = (source: sourceT('a)): callbagT('a) =>
47 curry((signal, data) =>
48 if (signal === CALLBAG_START) {
49 let callbag = unsafe_getCallbag(data);
50 source((. signal) =>
51 switch (signal) {
52 | Start(talkbackFn) =>
53 let wrappedTalkbackFn = (talkback: callbagSignal) =>
54 switch (talkback) {
55 | CALLBAG_START => ()
56 | CALLBAG_DATA => talkbackFn(. Pull)
57 | CALLBAG_END => talkbackFn(. Close)
58 };
59 callbag(CALLBAG_START, unsafe_wrap(wrappedTalkbackFn));
60 | Push(data) => callbag(CALLBAG_DATA, unsafe_wrap(data))
61 | End => callbag(CALLBAG_END, unsafe_wrap())
62 }
63 );
64 }
65 );