Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.2.6 8.5 kB view raw
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder } from './helpers'; 3 4declare global { 5 interface SymbolConstructor { 6 readonly observable: symbol; 7 } 8} 9 10/** A definition of the ES Observable Subscription type that is returned by 11 * {@link Observable.subscribe} 12 * 13 * @remarks 14 * The Subscription in ES Observables is a handle that is held while the Observable is actively 15 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed} 16 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the 17 * ongoing subscription and end the {@link Observable} early. 18 * 19 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 20 */ 21interface ObservableSubscription { 22 /** A boolean flag indicating whether the subscription is closed. 23 * @remarks 24 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and 25 * has terminated. No new values are expected. 26 * 27 * @readonly 28 */ 29 closed: boolean; 30 /** Cancels the subscription. 31 * @remarks 32 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will 33 * subsequently not be called at all. The subscription will be terminated and become inactive. 34 */ 35 unsubscribe(): void; 36} 37 38/** A definition of the ES Observable Observer type that is used to receive data from an 39 * {@link Observable}. 40 * 41 * @remarks 42 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from 43 * an {@link Observable} as it issues them. 44 * 45 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable 46 * specification of an Observer. 47 */ 48interface ObservableObserver<T> { 49 /** Callback for the Observable issuing new values. 50 * @param value - The value that the {@link Observable} is sending. 51 */ 52 next(value: T): void; 53 /** Callback for the Observable encountering an error, terminating it. 54 * @param error - The error that the {@link Observable} has encountered. 55 */ 56 error?(error: any): void; 57 /** Callback for the Observable ending, after all values have been issued. */ 58 complete?(): void; 59} 60 61/** A looser definition of ES Observable-like types that is used for interoperability. 62 * @remarks 63 * The Observable is often used by multiple libraries supporting or creating streams to provide 64 * interoperability for push-based streams. When converting from an Observable to a {@link Source}, 65 * this looser type is accepted as an input. 66 * 67 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 68 * @see {@link Observable} for the full ES Observable type. 69 */ 70interface ObservableLike<T> { 71 /** 72 * Subscribes to new signals from an {@link Observable} via callbacks. 73 * @param observer - An object containing callbacks for the various events of an Observable. 74 * @returns Subscription handle of type {@link ObservableSubscription}. 75 * 76 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 77 * issue events. 78 */ 79 subscribe(observer: ObservableObserver<T>): { unsubscribe(): void }; 80 81 /** The well-known symbol specifying the default ES Observable for an object. */ 82 [Symbol.observable]?(): Observable<T>; 83} 84 85/** An ES Observable type that is a de-facto standard for push-based data sources across the JS 86 * ecosystem. 87 * 88 * @remarks 89 * The Observable is often used by multiple libraries supporting or creating streams to provide 90 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in 91 * functionality to Observables, it provides utilities to cleanly convert to and from Observables. 92 * 93 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 94 */ 95interface Observable<T> { 96 /** Subscribes to new signals from an {@link Observable} via callbacks. 97 * @param observer - An object containing callbacks for the various events of an Observable. 98 * @returns Subscription handle of type {@link ObservableSubscription}. 99 * 100 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 101 * issue events. 102 */ 103 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 104 105 /** Subscribes to new signals from an {@link Observable} via callbacks. 106 * @param onNext - Callback for the Observable issuing new values. 107 * @param onError - Callback for the Observable encountering an error, terminating it. 108 * @param onComplete - Callback for the Observable ending, after all values have been issued. 109 * @returns Subscription handle of type {@link ObservableSubscription}. 110 */ 111 subscribe( 112 onNext: (value: T) => any, 113 onError?: (error: any) => any, 114 onComplete?: () => any 115 ): ObservableSubscription; 116 117 /** The well-known symbol specifying the default ES Observable for an object. */ 118 [Symbol.observable](): Observable<T>; 119} 120 121/** Returns the well-known symbol specifying the default ES Observable. 122 * @privateRemarks 123 * This symbol is used to mark an object as a default ES Observable. By the specification, an object 124 * that abides by the default Observable implementation must carry a method set to this well-known 125 * symbol that returns the Observable implementation. It's common for this object to be an 126 * Observable itself and return itself on this method. 127 * 128 * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility 129 * between Observable implementations. 130 * 131 * @internal 132 */ 133const observableSymbol = (): typeof Symbol.observable => 134 Symbol.observable || ('@@observable' as any); 135 136/** Converts an ES Observable to a {@link Source}. 137 * @param input - The {@link ObservableLike} object that will be converted. 138 * @returns A {@link Source} wrapping the passed Observable. 139 * 140 * @remarks 141 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and 142 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through 143 * all of the Observable's values. As such, this utility provides intercompatibility converting from 144 * standard Observables to Wonka Sources. 145 * 146 * @throws 147 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does 148 * not support or expect errors to be handled by streams. 149 */ 150export function fromObservable<T>(input: ObservableLike<T>): Source<T> { 151 return sink => { 152 const subscription = ( 153 input[observableSymbol()] ? input[observableSymbol()]!() : input 154 ).subscribe({ 155 next(value: T) { 156 sink(push(value)); 157 }, 158 complete() { 159 sink(SignalKind.End); 160 }, 161 error(error) { 162 throw error; 163 }, 164 }); 165 sink( 166 start(signal => { 167 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 168 }) 169 ); 170 }; 171} 172 173/** Converts a {@link Source} to an ES Observable. 174 * @param source - The {@link Source} that will be converted. 175 * @returns An {@link Observable} wrapping the passed Source. 176 * 177 * @remarks 178 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it 179 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides 180 * intercompatibility converting from Wonka Sources to standard ES Observables. 181 */ 182export function toObservable<T>(source: Source<T>): Observable<T> { 183 return { 184 subscribe( 185 next: ObservableObserver<T> | ((value: T) => any), 186 error?: (error: any) => any | undefined, 187 complete?: () => any | undefined 188 ) { 189 const observer: ObservableObserver<T> = 190 typeof next == 'object' ? next : { next, error, complete }; 191 let talkback = talkbackPlaceholder; 192 let ended = false; 193 source(signal => { 194 if (ended) { 195 /*noop*/ 196 } else if (signal === SignalKind.End) { 197 ended = true; 198 if (observer.complete) observer.complete(); 199 } else if (signal.tag === SignalKind.Start) { 200 (talkback = signal[0])(TalkbackKind.Pull); 201 } else { 202 observer.next(signal[0]); 203 talkback(TalkbackKind.Pull); 204 } 205 }); 206 const subscription = { 207 closed: false, 208 unsubscribe() { 209 subscription.closed = true; 210 ended = true; 211 talkback(TalkbackKind.Close); 212 }, 213 }; 214 return subscription; 215 }, 216 [observableSymbol()]() { 217 return this; 218 }, 219 }; 220}