1import { Source, SignalKind } from './types';
2import { push, start } from './helpers';
3
4/** A definition of the Callbag type as per its specification.
5 * @see {@link https://github.com/callbag/callbag} for the Callbag specification.
6 */
7interface Callbag<I, O> {
8 (t: 0, d: Callbag<O, I>): void;
9 (t: 1, d: I): void;
10 (t: 2, d?: any): void;
11}
12
13/** Converts a Callbag to a {@link Source}.
14 * @param callbag - The {@link Callbag} object that will be converted.
15 * @returns A {@link Source} wrapping the passed Callbag.
16 *
17 * @remarks
18 * This converts a Callbag to a {@link Source}. When this Source receives a {@link Sink} and
19 * the subscription starts, internally, it'll subscribe to the passed Callbag, passing through
20 * all of its emitted values.
21 */
22export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> {
23 return sink => {
24 callbag(0, (signal: number, data: any) => {
25 if (signal === 0) {
26 sink(
27 start(signal => {
28 data(signal + 1);
29 })
30 );
31 } else if (signal === 1) {
32 sink(push(data));
33 } else {
34 sink(SignalKind.End);
35 }
36 });
37 };
38}
39
40/** Converts a {@link Source} to a Callbag.
41 * @param source - The {@link Source} that will be converted.
42 * @returns A {@link Callbag} wrapping the passed Source.
43 *
44 * @remarks
45 * This converts a {@link Source} to a {@link Callbag}. When this Callbag is subscribed to, it
46 * internally subscribes to the Wonka Source and pulls new values.
47 */
48export function toCallbag<T>(source: Source<T>): Callbag<any, T> {
49 return (signal: number, sink: any) => {
50 if (signal === 0) {
51 source(signal => {
52 if (signal === SignalKind.End) {
53 sink(2);
54 } else if (signal.tag === SignalKind.Start) {
55 sink(0, (num: number) => {
56 if (num < 3) signal[0](num - 1);
57 });
58 } else {
59 sink(1, signal[0]);
60 }
61 });
62 }
63 };
64}