Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.3.5 1.9 kB view raw
1import { Source, SignalKind } from './types'; 2import { push, start } from './helpers'; 3 4/** A definition of the Callbag type as per its specification. 5 * @see {@link https://github.com/callbag/callbag} for the Callbag specification. 6 */ 7interface Callbag<I, O> { 8 (t: 0, d: Callbag<O, I>): void; 9 (t: 1, d: I): void; 10 (t: 2, d?: any): void; 11} 12 13/** Converts a Callbag to a {@link Source}. 14 * @param callbag - The {@link Callbag} object that will be converted. 15 * @returns A {@link Source} wrapping the passed Callbag. 16 * 17 * @remarks 18 * This converts a Callbag to a {@link Source}. When this Source receives a {@link Sink} and 19 * the subscription starts, internally, it'll subscribe to the passed Callbag, passing through 20 * all of its emitted values. 21 */ 22export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> { 23 return sink => { 24 callbag(0, (signal: number, data: any) => { 25 if (signal === 0) { 26 sink( 27 start(signal => { 28 data(signal + 1); 29 }) 30 ); 31 } else if (signal === 1) { 32 sink(push(data)); 33 } else { 34 sink(SignalKind.End); 35 } 36 }); 37 }; 38} 39 40/** Converts a {@link Source} to a Callbag. 41 * @param source - The {@link Source} that will be converted. 42 * @returns A {@link Callbag} wrapping the passed Source. 43 * 44 * @remarks 45 * This converts a {@link Source} to a {@link Callbag}. When this Callbag is subscribed to, it 46 * internally subscribes to the Wonka Source and pulls new values. 47 */ 48export function toCallbag<T>(source: Source<T>): Callbag<any, T> { 49 return (signal: number, sink: any) => { 50 if (signal === 0) { 51 source(signal => { 52 if (signal === SignalKind.End) { 53 sink(2); 54 } else if (signal.tag === SignalKind.Start) { 55 sink(0, (num: number) => { 56 if (num < 3) signal[0](num - 1); 57 }); 58 } else { 59 sink(1, signal[0]); 60 } 61 }); 62 } 63 }; 64}