Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.1.0 1.9 kB view raw
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4interface ObservableSubscription { 5 closed?: boolean; 6 unsubscribe(): void; 7} 8 9interface ObservableObserver<T> { 10 next(value: T): void; 11 error(error: any): void; 12 complete(): void; 13} 14 15interface Observable<T> { 16 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 17} 18 19const observableSymbol = (): symbol => 20 (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable')); 21 22export function fromObservable<T>(input: Observable<T>): Source<T> { 23 input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input; 24 return sink => { 25 const subscription = input.subscribe({ 26 next(value: T) { 27 sink(push(value)); 28 }, 29 complete() { 30 sink(SignalKind.End); 31 }, 32 error() { 33 /*noop*/ 34 }, 35 }); 36 sink( 37 start(signal => { 38 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 39 }) 40 ); 41 }; 42} 43 44export function toObservable<T>(source: Source<T>): Observable<T> { 45 return { 46 subscribe(observer: ObservableObserver<T>) { 47 let talkback = talkbackPlaceholder; 48 let ended = false; 49 source(signal => { 50 if (ended) { 51 /*noop*/ 52 } else if (signal === SignalKind.End) { 53 ended = true; 54 observer.complete(); 55 } else if (signal.tag === SignalKind.Start) { 56 (talkback = signal[0])(TalkbackKind.Pull); 57 } else { 58 observer.next(signal[0]); 59 talkback(TalkbackKind.Pull); 60 } 61 }); 62 const subscription = { 63 closed: false, 64 unsubscribe() { 65 subscription.closed = true; 66 ended = true; 67 talkback(TalkbackKind.Close); 68 }, 69 }; 70 return subscription; 71 }, 72 [observableSymbol()]() { 73 return this; 74 }, 75 }; 76}