1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start } from './helpers';
3
4interface Callbag<I, O> {
5 (t: 0, d: Callbag<O, I>): void;
6 (t: 1, d: I): void;
7 (t: 2, d?: any): void;
8}
9
10export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> {
11 return sink => {
12 callbag(0, (signal: number, data: any) => {
13 if (signal === 0) {
14 sink(
15 start(signal => {
16 if (signal === TalkbackKind.Pull) {
17 data(1);
18 } else {
19 data(2);
20 }
21 })
22 );
23 } else if (signal === 1) {
24 sink(push(data));
25 } else if (signal === 2) {
26 sink(SignalKind.End);
27 }
28 });
29 };
30}
31
32export function toCallbag<T>(source: Source<T>): Callbag<any, T> {
33 return (signal: number, sink: any) => {
34 if (signal === 0) {
35 source(signal => {
36 if (signal === SignalKind.End) {
37 sink(2);
38 } else if (signal.tag === SignalKind.Start) {
39 sink(0, (num: number) => {
40 if (num === 1) {
41 signal[0](TalkbackKind.Pull);
42 } else if (num === 2) {
43 signal[0](TalkbackKind.Close);
44 }
45 });
46 } else {
47 sink(1, signal[0]);
48 }
49 });
50 }
51 };
52}