Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
at v6.3.3 7.8 kB view raw
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder, observableSymbol } from './helpers'; 3 4/** A definition of the ES Observable Subscription type that is returned by 5 * {@link Observable.subscribe} 6 * 7 * @remarks 8 * The Subscription in ES Observables is a handle that is held while the Observable is actively 9 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed} 10 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the 11 * ongoing subscription and end the {@link Observable} early. 12 * 13 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 14 */ 15interface ObservableSubscription { 16 /** A boolean flag indicating whether the subscription is closed. 17 * @remarks 18 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and 19 * has terminated. No new values are expected. 20 * 21 * @readonly 22 */ 23 closed: boolean; 24 /** Cancels the subscription. 25 * @remarks 26 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will 27 * subsequently not be called at all. The subscription will be terminated and become inactive. 28 */ 29 unsubscribe(): void; 30} 31 32/** A definition of the ES Observable Observer type that is used to receive data from an 33 * {@link Observable}. 34 * 35 * @remarks 36 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from 37 * an {@link Observable} as it issues them. 38 * 39 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable 40 * specification of an Observer. 41 */ 42interface ObservableObserver<T> { 43 /** Callback for the Observable issuing new values. 44 * @param value - The value that the {@link Observable} is sending. 45 */ 46 next(value: T): void; 47 /** Callback for the Observable encountering an error, terminating it. 48 * @param error - The error that the {@link Observable} has encountered. 49 */ 50 error?(error: any): void; 51 /** Callback for the Observable ending, after all values have been issued. */ 52 complete?(): void; 53} 54 55/** A looser definition of ES Observable-like types that is used for interoperability. 56 * @remarks 57 * The Observable is often used by multiple libraries supporting or creating streams to provide 58 * interoperability for push-based streams. When converting from an Observable to a {@link Source}, 59 * this looser type is accepted as an input. 60 * 61 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 62 * @see {@link Observable} for the full ES Observable type. 63 */ 64interface ObservableLike<T> { 65 /** 66 * Subscribes to new signals from an {@link Observable} via callbacks. 67 * @param observer - An object containing callbacks for the various events of an Observable. 68 * @returns Subscription handle of type {@link ObservableSubscription}. 69 * 70 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 71 * issue events. 72 */ 73 subscribe(observer: ObservableObserver<T>): { unsubscribe(): void }; 74 75 /** The well-known symbol specifying the default ES Observable for an object. */ 76 [Symbol.observable]?(): Observable<T>; 77} 78 79/** An ES Observable type that is a de-facto standard for push-based data sources across the JS 80 * ecosystem. 81 * 82 * @remarks 83 * The Observable is often used by multiple libraries supporting or creating streams to provide 84 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in 85 * functionality to Observables, it provides utilities to cleanly convert to and from Observables. 86 * 87 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 88 */ 89interface Observable<T> { 90 /** Subscribes to new signals from an {@link Observable} via callbacks. 91 * @param observer - An object containing callbacks for the various events of an Observable. 92 * @returns Subscription handle of type {@link ObservableSubscription}. 93 * 94 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 95 * issue events. 96 */ 97 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 98 99 /** Subscribes to new signals from an {@link Observable} via callbacks. 100 * @param onNext - Callback for the Observable issuing new values. 101 * @param onError - Callback for the Observable encountering an error, terminating it. 102 * @param onComplete - Callback for the Observable ending, after all values have been issued. 103 * @returns Subscription handle of type {@link ObservableSubscription}. 104 */ 105 subscribe( 106 onNext: (value: T) => any, 107 onError?: (error: any) => any, 108 onComplete?: () => any 109 ): ObservableSubscription; 110 111 /** The well-known symbol specifying the default ES Observable for an object. */ 112 [Symbol.observable](): Observable<T>; 113} 114 115/** Converts an ES Observable to a {@link Source}. 116 * @param input - The {@link ObservableLike} object that will be converted. 117 * @returns A {@link Source} wrapping the passed Observable. 118 * 119 * @remarks 120 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and 121 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through 122 * all of the Observable's values. As such, this utility provides intercompatibility converting from 123 * standard Observables to Wonka Sources. 124 * 125 * @throws 126 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does 127 * not support or expect errors to be handled by streams. 128 */ 129export function fromObservable<T>(input: ObservableLike<T>): Source<T> { 130 return sink => { 131 const subscription = ( 132 input[observableSymbol()] ? input[observableSymbol()]!() : input 133 ).subscribe({ 134 next(value: T) { 135 sink(push(value)); 136 }, 137 complete() { 138 sink(SignalKind.End); 139 }, 140 error(error) { 141 throw error; 142 }, 143 }); 144 sink( 145 start(signal => { 146 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 147 }) 148 ); 149 }; 150} 151 152/** Converts a {@link Source} to an ES Observable. 153 * @param source - The {@link Source} that will be converted. 154 * @returns An {@link Observable} wrapping the passed Source. 155 * 156 * @remarks 157 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it 158 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides 159 * intercompatibility converting from Wonka Sources to standard ES Observables. 160 */ 161export function toObservable<T>(source: Source<T>): Observable<T> { 162 return { 163 subscribe( 164 next: ObservableObserver<T> | ((value: T) => any), 165 error?: (error: any) => any | undefined, 166 complete?: () => any | undefined 167 ) { 168 const observer: ObservableObserver<T> = 169 typeof next == 'object' ? next : { next, error, complete }; 170 let talkback = talkbackPlaceholder; 171 let ended = false; 172 source(signal => { 173 if (ended) { 174 /*noop*/ 175 } else if (signal === SignalKind.End) { 176 ended = true; 177 if (observer.complete) observer.complete(); 178 } else if (signal.tag === SignalKind.Start) { 179 (talkback = signal[0])(TalkbackKind.Pull); 180 } else { 181 observer.next(signal[0]); 182 talkback(TalkbackKind.Pull); 183 } 184 }); 185 const subscription = { 186 closed: false, 187 unsubscribe() { 188 subscription.closed = true; 189 ended = true; 190 talkback(TalkbackKind.Close); 191 }, 192 }; 193 return subscription; 194 }, 195 [observableSymbol()]() { 196 return this; 197 }, 198 }; 199}