Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

feat: Completely fulfill the Observable spec in toObservable (#132)

Changed files
+32 -8
.changeset
src
+5
.changeset/seven-meals-film.md
···
+
---
+
'wonka': patch
+
---
+
+
Fix implementation of Observable spec as such that Observable.subscribe(onNext, onError, onComplete) becomes valid.
+27 -8
src/observable.ts
···
interface ObservableObserver<T> {
next(value: T): void;
-
error(error: any): void;
-
complete(): void;
+
error?(error: any): void;
+
complete?(): void;
+
}
+
+
interface ObservableLike<T> {
+
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
+
[Symbol.observable]?(): Observable<T>;
}
interface Observable<T> {
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
+
+
subscribe(
+
onNext: (value: T) => any,
+
onError?: (error: any) => any,
+
onComplete?: () => any
+
): ObservableSubscription;
+
+
[Symbol.observable](): Observable<T>;
}
-
const observableSymbol = (): symbol | string => Symbol.observable || '@@observable';
+
const observableSymbol = (): typeof Symbol.observable => Symbol.observable || '@@observable';
-
export function fromObservable<T>(input: Observable<T>): Source<T> {
+
export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input;
return sink => {
const subscription = input.subscribe({
···
complete() {
sink(SignalKind.End);
},
-
error() {
-
/*noop*/
+
error(error) {
+
throw error;
},
});
sink(
···
export function toObservable<T>(source: Source<T>): Observable<T> {
return {
-
subscribe(observer: ObservableObserver<T>) {
+
subscribe(
+
next: ObservableObserver<T> | ((value: T) => any),
+
error?: (error: any) => any | undefined,
+
complete?: () => any | undefined
+
) {
+
const observer: ObservableObserver<T> =
+
typeof next == 'object' ? next : { next, error, complete };
let talkback = talkbackPlaceholder;
let ended = false;
source(signal => {
···
/*noop*/
} else if (signal === SignalKind.End) {
ended = true;
-
observer.complete();
+
if (observer.complete) observer.complete();
} else if (signal.tag === SignalKind.Start) {
(talkback = signal[0])(TalkbackKind.Pull);
} else {