Mirror: 馃帺 A tiny but capable push & pull stream library for TypeScript and Flow
1import { Source, SignalKind, TalkbackKind } from './types'; 2import { push, start, talkbackPlaceholder, observableSymbol } from './helpers'; 3 4// NOTE: This must be placed in an exported file for `rollup-plugin-dts` 5// to include it in output typings files 6declare global { 7 interface SymbolConstructor { 8 readonly observable: symbol; 9 } 10} 11 12/** A definition of the ES Observable Subscription type that is returned by 13 * {@link Observable.subscribe} 14 * 15 * @remarks 16 * The Subscription in ES Observables is a handle that is held while the Observable is actively 17 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed} 18 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the 19 * ongoing subscription and end the {@link Observable} early. 20 * 21 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 22 */ 23interface ObservableSubscription { 24 /** A boolean flag indicating whether the subscription is closed. 25 * @remarks 26 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and 27 * has terminated. No new values are expected. 28 * 29 * @readonly 30 */ 31 closed: boolean; 32 /** Cancels the subscription. 33 * @remarks 34 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will 35 * subsequently not be called at all. The subscription will be terminated and become inactive. 36 */ 37 unsubscribe(): void; 38} 39 40/** A definition of the ES Observable Observer type that is used to receive data from an 41 * {@link Observable}. 42 * 43 * @remarks 44 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from 45 * an {@link Observable} as it issues them. 46 * 47 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable 48 * specification of an Observer. 49 */ 50interface ObservableObserver<T> { 51 /** Callback for the Observable issuing new values. 52 * @param value - The value that the {@link Observable} is sending. 53 */ 54 next(value: T): void; 55 /** Callback for the Observable encountering an error, terminating it. 56 * @param error - The error that the {@link Observable} has encountered. 57 */ 58 error?(error: any): void; 59 /** Callback for the Observable ending, after all values have been issued. */ 60 complete?(): void; 61} 62 63/** A looser definition of ES Observable-like types that is used for interoperability. 64 * @remarks 65 * The Observable is often used by multiple libraries supporting or creating streams to provide 66 * interoperability for push-based streams. When converting from an Observable to a {@link Source}, 67 * this looser type is accepted as an input. 68 * 69 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 70 * @see {@link Observable} for the full ES Observable type. 71 */ 72interface ObservableLike<T> { 73 /** 74 * Subscribes to new signals from an {@link Observable} via callbacks. 75 * @param observer - An object containing callbacks for the various events of an Observable. 76 * @returns Subscription handle of type {@link ObservableSubscription}. 77 * 78 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 79 * issue events. 80 */ 81 subscribe(observer: ObservableObserver<T>): { unsubscribe(): void }; 82 83 /** The well-known symbol specifying the default ES Observable for an object. */ 84 [Symbol.observable]?(): Observable<T>; 85} 86 87/** An ES Observable type that is a de-facto standard for push-based data sources across the JS 88 * ecosystem. 89 * 90 * @remarks 91 * The Observable is often used by multiple libraries supporting or creating streams to provide 92 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in 93 * functionality to Observables, it provides utilities to cleanly convert to and from Observables. 94 * 95 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification. 96 */ 97interface Observable<T> { 98 /** Subscribes to new signals from an {@link Observable} via callbacks. 99 * @param observer - An object containing callbacks for the various events of an Observable. 100 * @returns Subscription handle of type {@link ObservableSubscription}. 101 * 102 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables 103 * issue events. 104 */ 105 subscribe(observer: ObservableObserver<T>): ObservableSubscription; 106 107 /** Subscribes to new signals from an {@link Observable} via callbacks. 108 * @param onNext - Callback for the Observable issuing new values. 109 * @param onError - Callback for the Observable encountering an error, terminating it. 110 * @param onComplete - Callback for the Observable ending, after all values have been issued. 111 * @returns Subscription handle of type {@link ObservableSubscription}. 112 */ 113 subscribe( 114 onNext: (value: T) => any, 115 onError?: (error: any) => any, 116 onComplete?: () => any 117 ): ObservableSubscription; 118 119 /** The well-known symbol specifying the default ES Observable for an object. */ 120 [Symbol.observable](): Observable<T>; 121} 122 123/** Converts an ES Observable to a {@link Source}. 124 * @param input - The {@link ObservableLike} object that will be converted. 125 * @returns A {@link Source} wrapping the passed Observable. 126 * 127 * @remarks 128 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and 129 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through 130 * all of the Observable's values. As such, this utility provides intercompatibility converting from 131 * standard Observables to Wonka Sources. 132 * 133 * @throws 134 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does 135 * not support or expect errors to be handled by streams. 136 */ 137export function fromObservable<T>(input: ObservableLike<T>): Source<T> { 138 return sink => { 139 const subscription = ( 140 input[observableSymbol()] ? input[observableSymbol()]!() : input 141 ).subscribe({ 142 next(value: T) { 143 sink(push(value)); 144 }, 145 complete() { 146 sink(SignalKind.End); 147 }, 148 error(error) { 149 throw error; 150 }, 151 }); 152 sink( 153 start(signal => { 154 if (signal === TalkbackKind.Close) subscription.unsubscribe(); 155 }) 156 ); 157 }; 158} 159 160/** Converts a {@link Source} to an ES Observable. 161 * @param source - The {@link Source} that will be converted. 162 * @returns An {@link Observable} wrapping the passed Source. 163 * 164 * @remarks 165 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it 166 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides 167 * intercompatibility converting from Wonka Sources to standard ES Observables. 168 */ 169export function toObservable<T>(source: Source<T>): Observable<T> { 170 return { 171 subscribe( 172 next: ObservableObserver<T> | ((value: T) => any), 173 error?: (error: any) => any | undefined, 174 complete?: () => any | undefined 175 ) { 176 const observer: ObservableObserver<T> = 177 typeof next == 'object' ? next : { next, error, complete }; 178 let talkback = talkbackPlaceholder; 179 let ended = false; 180 source(signal => { 181 if (ended) { 182 /*noop*/ 183 } else if (signal === SignalKind.End) { 184 ended = true; 185 if (observer.complete) observer.complete(); 186 } else if (signal.tag === SignalKind.Start) { 187 (talkback = signal[0])(TalkbackKind.Pull); 188 } else { 189 observer.next(signal[0]); 190 talkback(TalkbackKind.Pull); 191 } 192 }); 193 const subscription = { 194 closed: false, 195 unsubscribe() { 196 subscription.closed = true; 197 ended = true; 198 talkback(TalkbackKind.Close); 199 }, 200 }; 201 return subscription; 202 }, 203 [observableSymbol()]() { 204 return this; 205 }, 206 }; 207}