import { Source, SignalKind, TalkbackKind } from './types'; import { push, start, talkbackPlaceholder } from './helpers'; interface ObservableSubscription { closed?: boolean; unsubscribe(): void; } interface ObservableObserver { next(value: T): void; error(error: any): void; complete(): void; } interface Observable { subscribe(observer: ObservableObserver): ObservableSubscription; } const observableSymbol = (): symbol => (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable')); export function fromObservable(input: Observable): Source { input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input; return sink => { const subscription = input.subscribe({ next(value: T) { sink(push(value)); }, complete() { sink(SignalKind.End); }, error() { /*noop*/ }, }); sink( start(signal => { if (signal === TalkbackKind.Close) subscription.unsubscribe(); }) ); }; } export function toObservable(source: Source): Observable { return { subscribe(observer: ObservableObserver) { let talkback = talkbackPlaceholder; let ended = false; source(signal => { if (ended) { /*noop*/ } else if (signal === SignalKind.End) { ended = true; observer.complete(); } else if (signal.tag === SignalKind.Start) { (talkback = signal[0])(TalkbackKind.Pull); } else { observer.next(signal[0]); talkback(TalkbackKind.Pull); } }); const subscription = { closed: false, unsubscribe() { subscription.closed = true; ended = true; talkback(TalkbackKind.Close); }, }; return subscription; }, [observableSymbol()]() { return this; }, }; }