Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4/** A definition of the ES Observable Subscription type that is returned by 5 * {@link Observable.subscribe} 6 * 7 * @remarks 8 * The Subscription in ES Observables is a handle that is held while the Observable is actively 9 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed} 10 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the 11 * ongoing subscription and end the {@link Observable} early. 12 * 13 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 14 */ 15interface ObservableSubscription { 16 /** A boolean flag indicating whether the subscription is closed. 17 * @remarks 18 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and 19 * has terminated. No new values are expected. 20 * 21 * @readonly 22 */ 23 closed?: boolean; 24 /** Cancels the subscription. 25 * @remarks 26 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will 27 * subsequently not be called at all. The subscription will be terminated and become inactive. 28 */ 29 unsubscribe(): void; 30} 31 32/** A definition of the ES Observable Observer type that is used to receive data from an 33 * {@link Observable}. 34 * 35 * @remarks 36 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from 37 * an {@link Observable} as it issues them. 38 * 39 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable 40 * specification of an Observer. 41 */ 42interface ObservableObserver<T> { 43 /** Callback for the Observable issuing new values. 44 * @param value - The value that the {@link Observable} is sending. 45 */ 46 next(value: T): void; 47 /** Callback for the Observable encountering an error, terminating it. 48 * @param error - The error that the {@link Observable} has encountered. 49 */ 50 error?(error: any): void; 51 /** Callback for the Observable ending, after all values have been issued. */ 52 complete?(): void; 53} 54 55/** A looser definition of ES Observable-like types that is used for interoperability. 56 * @remarks 57 * The Observable is often used by multiple libraries supporting or creating streams to provide 58 * interoperability for push-based streams. When converting from an Observable to a {@link Source}, 59 * this looser type is accepted as an input. 60 * 61 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 62 * @see {@link Observable} for the full ES Observable type. 63 */ 64interface ObservableLike<T> { 65 /** 66 * Subscribes to new signals from an {@link Observable} via callbacks. 67 * @param observer - An object containing callbacks for the various events of an Observable. 68 * @returns Subscription handle of type {@link ObservableSubscription}. 69 * 70 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 71 * issue events. 72 */ 73 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 74 75 /** The well-known symbol specifying the default ES Observable for an object. */ 76 [Symbol.observable]?(): Observable<T>; 77} 78 79/** An ES Observable type that is a de-facto standard for push-based data sources across the JS 80 * ecosystem. 81 * 82 * @remarks 83 * The Observable is often used by multiple libraries supporting or creating streams to provide 84 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in 85 * functionality to Observables, it provides utilities to cleanly convert to and from Observables. 86 * 87 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 88 */ 89interface Observable<T> { 90 /** Subscribes to new signals from an {@link Observable} via callbacks. 91 * @param observer - An object containing callbacks for the various events of an Observable. 92 * @returns Subscription handle of type {@link ObservableSubscription}. 93 * 94 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 95 * issue events. 96 */ 97 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 98 99 /** Subscribes to new signals from an {@link Observable} via callbacks. 100 * @param onNext - Callback for the Observable issuing new values. 101 * @param onError - Callback for the Observable encountering an error, terminating it. 102 * @param onComplete - Callback for the Observable ending, after all values have been issued. 103 * @returns Subscription handle of type {@link ObservableSubscription}. 104 */ 105 subscribe( 106 onNext: (value: T) => any, 107 onError?: (error: any) => any, 108 onComplete?: () => any 109 ): ObservableSubscription; 110 111 /** The well-known symbol specifying the default ES Observable for an object. */ 112 [Symbol.observable](): Observable<T>; 113} 114 115/** Returns the well-known symbol specifying the default ES Observable. 116 * @privateRemarks 117 * This symbol is used to mark an object as a default ES Observable. By the specification, an object 118 * that abides by the default Observable implementation must carry a method set to this well-known 119 * symbol that returns the Observable implementation. It's common for this object to be an 120 * Observable itself and return itself on this method. 121 * 122 * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility 123 * between Observable implementations. 124 * 125 * @internal 126 */ 127const observableSymbol = (): typeof Symbol.observable => Symbol.observable || '@@observable'; 128 129/** Converts an ES Observable to a {@link Source}. 130 * @param input - The {@link ObservableLike} object that will be converted. 131 * @returns A {@link Source} wrapping the passed Observable. 132 * 133 * @remarks 134 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and 135 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through 136 * all of the Observable's values. As such, this utility provides intercompatibility converting from 137 * standard Observables to Wonka Sources. 138 * 139 * @throws 140 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does 141 * not support or expect errors to be handled by streams. 142 */ 143export function fromObservable<T>(input: ObservableLike<T>): Source<T> { 144 return sink => { 145 const subscription = ( 146 input[observableSymbol()] ? input[observableSymbol()]!() : input 147 ).subscribe({ 148 next(value: T) { 149 sink(push(value)); 150 }, 151 complete() { 152 sink(SignalKind.End); 153 }, 154 error(error) { 155 throw error; 156 }, 157 }); 158 sink( 159 start(signal => { 160 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 161 }) 162 ); 163 }; 164} 165 166/** Converts a {@link Source} to an ES Observable. 167 * @param source - The {@link Source} that will be converted. 168 * @returns An {@link Observable} wrapping the passed Source. 169 * 170 * @remarks 171 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it 172 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides 173 * intercompatibility converting from Wonka Sources to standard ES Observables. 174 */ 175export function toObservable<T>(source: Source<T>): Observable<T> { 176 return { 177 subscribe( 178 next: ObservableObserver<T> | ((value: T) => any), 179 error?: (error: any) => any | undefined, 180 complete?: () => any | undefined 181 ) { 182 const observer: ObservableObserver<T> = 183 typeof next == 'object' ? next : { next, error, complete }; 184 let talkback = talkbackPlaceholder; 185 let ended = false; 186 source(signal => { 187 if (ended) { 188 /*noop*/ 189 } else if (signal === SignalKind.End) { 190 ended = true; 191 if (observer.complete) observer.complete(); 192 } else if (signal.tag === SignalKind.Start) { 193 (talkback = signal[0])(TalkbackKind.Pull); 194 } else { 195 observer.next(signal[0]); 196 talkback(TalkbackKind.Pull); 197 } 198 }); 199 const subscription = { 200 closed: false, 201 unsubscribe() { 202 subscription.closed = true; 203 ended = true; 204 talkback(TalkbackKind.Close); 205 }, 206 }; 207 return subscription; 208 }, 209 [observableSymbol()]() { 210 return this; 211 }, 212 }; 213}