Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start } from './helpers'; 3 4interface Callbag<I, O> { 5 (t: 0, d: Callbag<O, I>): void; 6 (t: 1, d: I): void; 7 (t: 2, d?: any): void; 8} 9 10export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> { 11 return sink => { 12 callbag(0, (signal: number, data: any) => { 13 if (signal === 0) { 14 sink( 15 start(signal => { 16 if (signal === TalkbackKind.Pull) { 17 data(1); 18 } else { 19 data(2); 20 } 21 }) 22 ); 23 } else if (signal === 1) { 24 sink(push(data)); 25 } else if (signal === 2) { 26 sink(SignalKind.End); 27 } 28 }); 29 }; 30} 31 32export function toCallbag<T>(source: Source<T>): Callbag<any, T> { 33 return (signal: number, sink: any) => { 34 if (signal === 0) { 35 source(signal => { 36 if (signal === SignalKind.End) { 37 sink(2); 38 } else if (signal.tag === SignalKind.Start) { 39 sink(0, (num: number) => { 40 if (num === 1) { 41 signal[0](TalkbackKind.Pull); 42 } else if (num === 2) { 43 signal[0](TalkbackKind.Close); 44 } 45 }); 46 } else { 47 sink(1, signal[0]); 48 } 49 }); 50 } 51 }; 52}