Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4interface ObservableSubscription { 5 closed?: boolean; 6 unsubscribe(): void; 7} 8 9interface ObservableObserver<T> { 10 next(value: T): void; 11 error(error: any): void; 12 complete(): void; 13} 14 15interface Observable<T> { 16 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 17} 18 19const observableSymbol = (): symbol | string => Symbol.observable || '@@observable'; 20 21export function fromObservable<T>(input: Observable<T>): Source<T> { 22 input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input; 23 return sink => { 24 const subscription = input.subscribe({ 25 next(value: T) { 26 sink(push(value)); 27 }, 28 complete() { 29 sink(SignalKind.End); 30 }, 31 error() { 32 /*noop*/ 33 }, 34 }); 35 sink( 36 start(signal => { 37 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 38 }) 39 ); 40 }; 41} 42 43export function toObservable<T>(source: Source<T>): Observable<T> { 44 return { 45 subscribe(observer: ObservableObserver<T>) { 46 let talkback = talkbackPlaceholder; 47 let ended = false; 48 source(signal => { 49 if (ended) { 50 /*noop*/ 51 } else if (signal === SignalKind.End) { 52 ended = true; 53 observer.complete(); 54 } else if (signal.tag === SignalKind.Start) { 55 (talkback = signal[0])(TalkbackKind.Pull); 56 } else { 57 observer.next(signal[0]); 58 talkback(TalkbackKind.Pull); 59 } 60 }); 61 const subscription = { 62 closed: false, 63 unsubscribe() { 64 subscription.closed = true; 65 ended = true; 66 talkback(TalkbackKind.Close); 67 }, 68 }; 69 return subscription; 70 }, 71 [observableSymbol()]() { 72 return this; 73 }, 74 }; 75}