Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4interface ObservableSubscription { 5 closed?: boolean; 6 unsubscribe(): void; 7} 8 9interface ObservableObserver<T> { 10 next(value: T): void; 11 error(error: any): void; 12 complete(): void; 13} 14 15interface Observable<T> { 16 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 17} 18 19const observableSymbol: unique symbol = 20 typeof Symbol === 'function' 21 ? (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable')) 22 : '@@observable'; 23 24export function fromObservable<T>(input: Observable<T>): Source<T> { 25 const observable: Observable<T> = input[observableSymbol] 26 ? (input as any)[observableSymbol]() 27 : input; 28 return sink => { 29 const subscription = observable.subscribe({ 30 next(value: T) { 31 sink(push(value)); 32 }, 33 complete() { 34 sink(SignalKind.End); 35 }, 36 error() { 37 /*noop*/ 38 }, 39 }); 40 sink( 41 start(signal => { 42 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 43 }) 44 ); 45 }; 46} 47 48export function toObservable<T>(source: Source<T>): Observable<T> { 49 const observable: Observable<T> = { 50 subscribe(observer: ObservableObserver<T>) { 51 let talkback = talkbackPlaceholder; 52 let ended = false; 53 source(signal => { 54 if (ended) { 55 /*noop*/ 56 } else if (signal === SignalKind.End) { 57 ended = true; 58 observer.complete(); 59 } else if (signal.tag === SignalKind.Start) { 60 (talkback = signal[0])(TalkbackKind.Pull); 61 } else { 62 observer.next(signal[0]); 63 talkback(TalkbackKind.Pull); 64 } 65 }); 66 const subscription = { 67 closed: false, 68 unsubscribe() { 69 subscription.closed = true; 70 ended = true; 71 talkback(TalkbackKind.Close); 72 }, 73 }; 74 return subscription; 75 }, 76 }; 77 observable[observableSymbol] = () => observable; 78 return observable; 79}