1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4declare global {
5 interface SymbolConstructor {
6 readonly observable: symbol;
7 }
8}
9
10/** A definition of the ES Observable Subscription type that is returned by
11 * {@link Observable.subscribe}
12 *
13 * @remarks
14 * The Subscription in ES Observables is a handle that is held while the Observable is actively
15 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed}
16 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the
17 * ongoing subscription and end the {@link Observable} early.
18 *
19 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
20 */
21interface ObservableSubscription {
22 /** A boolean flag indicating whether the subscription is closed.
23 * @remarks
24 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and
25 * has terminated. No new values are expected.
26 *
27 * @readonly
28 */
29 closed?: boolean;
30 /** Cancels the subscription.
31 * @remarks
32 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will
33 * subsequently not be called at all. The subscription will be terminated and become inactive.
34 */
35 unsubscribe(): void;
36}
37
38/** A definition of the ES Observable Observer type that is used to receive data from an
39 * {@link Observable}.
40 *
41 * @remarks
42 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from
43 * an {@link Observable} as it issues them.
44 *
45 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable
46 * specification of an Observer.
47 */
48interface ObservableObserver<T> {
49 /** Callback for the Observable issuing new values.
50 * @param value - The value that the {@link Observable} is sending.
51 */
52 next(value: T): void;
53 /** Callback for the Observable encountering an error, terminating it.
54 * @param error - The error that the {@link Observable} has encountered.
55 */
56 error?(error: any): void;
57 /** Callback for the Observable ending, after all values have been issued. */
58 complete?(): void;
59}
60
61/** A looser definition of ES Observable-like types that is used for interoperability.
62 * @remarks
63 * The Observable is often used by multiple libraries supporting or creating streams to provide
64 * interoperability for push-based streams. When converting from an Observable to a {@link Source},
65 * this looser type is accepted as an input.
66 *
67 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
68 * @see {@link Observable} for the full ES Observable type.
69 */
70interface ObservableLike<T> {
71 /**
72 * Subscribes to new signals from an {@link Observable} via callbacks.
73 * @param observer - An object containing callbacks for the various events of an Observable.
74 * @returns Subscription handle of type {@link ObservableSubscription}.
75 *
76 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
77 * issue events.
78 */
79 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
80
81 /** The well-known symbol specifying the default ES Observable for an object. */
82 [Symbol.observable]?(): Observable<T>;
83}
84
85/** An ES Observable type that is a de-facto standard for push-based data sources across the JS
86 * ecosystem.
87 *
88 * @remarks
89 * The Observable is often used by multiple libraries supporting or creating streams to provide
90 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in
91 * functionality to Observables, it provides utilities to cleanly convert to and from Observables.
92 *
93 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
94 */
95interface Observable<T> {
96 /** Subscribes to new signals from an {@link Observable} via callbacks.
97 * @param observer - An object containing callbacks for the various events of an Observable.
98 * @returns Subscription handle of type {@link ObservableSubscription}.
99 *
100 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
101 * issue events.
102 */
103 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
104
105 /** Subscribes to new signals from an {@link Observable} via callbacks.
106 * @param onNext - Callback for the Observable issuing new values.
107 * @param onError - Callback for the Observable encountering an error, terminating it.
108 * @param onComplete - Callback for the Observable ending, after all values have been issued.
109 * @returns Subscription handle of type {@link ObservableSubscription}.
110 */
111 subscribe(
112 onNext: (value: T) => any,
113 onError?: (error: any) => any,
114 onComplete?: () => any
115 ): ObservableSubscription;
116
117 /** The well-known symbol specifying the default ES Observable for an object. */
118 [Symbol.observable](): Observable<T>;
119}
120
121/** Returns the well-known symbol specifying the default ES Observable.
122 * @privateRemarks
123 * This symbol is used to mark an object as a default ES Observable. By the specification, an object
124 * that abides by the default Observable implementation must carry a method set to this well-known
125 * symbol that returns the Observable implementation. It's common for this object to be an
126 * Observable itself and return itself on this method.
127 *
128 * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
129 * between Observable implementations.
130 *
131 * @internal
132 */
133const observableSymbol = (): typeof Symbol.observable =>
134 Symbol.observable || ('@@observable' as any);
135
136/** Converts an ES Observable to a {@link Source}.
137 * @param input - The {@link ObservableLike} object that will be converted.
138 * @returns A {@link Source} wrapping the passed Observable.
139 *
140 * @remarks
141 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and
142 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through
143 * all of the Observable's values. As such, this utility provides intercompatibility converting from
144 * standard Observables to Wonka Sources.
145 *
146 * @throws
147 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does
148 * not support or expect errors to be handled by streams.
149 */
150export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
151 return sink => {
152 const subscription = (
153 input[observableSymbol()] ? input[observableSymbol()]!() : input
154 ).subscribe({
155 next(value: T) {
156 sink(push(value));
157 },
158 complete() {
159 sink(SignalKind.End);
160 },
161 error(error) {
162 throw error;
163 },
164 });
165 sink(
166 start(signal => {
167 if (signal === TalkbackKind.Close) subscription.unsubscribe();
168 })
169 );
170 };
171}
172
173/** Converts a {@link Source} to an ES Observable.
174 * @param source - The {@link Source} that will be converted.
175 * @returns An {@link Observable} wrapping the passed Source.
176 *
177 * @remarks
178 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it
179 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides
180 * intercompatibility converting from Wonka Sources to standard ES Observables.
181 */
182export function toObservable<T>(source: Source<T>): Observable<T> {
183 return {
184 subscribe(
185 next: ObservableObserver<T> | ((value: T) => any),
186 error?: (error: any) => any | undefined,
187 complete?: () => any | undefined
188 ) {
189 const observer: ObservableObserver<T> =
190 typeof next == 'object' ? next : { next, error, complete };
191 let talkback = talkbackPlaceholder;
192 let ended = false;
193 source(signal => {
194 if (ended) {
195 /*noop*/
196 } else if (signal === SignalKind.End) {
197 ended = true;
198 if (observer.complete) observer.complete();
199 } else if (signal.tag === SignalKind.Start) {
200 (talkback = signal[0])(TalkbackKind.Pull);
201 } else {
202 observer.next(signal[0]);
203 talkback(TalkbackKind.Pull);
204 }
205 });
206 const subscription = {
207 closed: false,
208 unsubscribe() {
209 subscription.closed = true;
210 ended = true;
211 talkback(TalkbackKind.Close);
212 },
213 };
214 return subscription;
215 },
216 [observableSymbol()]() {
217 return this;
218 },
219 };
220}