1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4/** A definition of the ES Observable Subscription type that is returned by
5 * {@link Observable.subscribe}
6 *
7 * @remarks
8 * The Subscription in ES Observables is a handle that is held while the Observable is actively
9 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed}
10 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the
11 * ongoing subscription and end the {@link Observable} early.
12 *
13 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
14 */
15interface ObservableSubscription {
16 /** A boolean flag indicating whether the subscription is closed.
17 * @remarks
18 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and
19 * has terminated. No new values are expected.
20 *
21 * @readonly
22 */
23 closed?: boolean;
24 /** Cancels the subscription.
25 * @remarks
26 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will
27 * subsequently not be called at all. The subscription will be terminated and become inactive.
28 */
29 unsubscribe(): void;
30}
31
32/** A definition of the ES Observable Observer type that is used to receive data from an
33 * {@link Observable}.
34 *
35 * @remarks
36 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from
37 * an {@link Observable} as it issues them.
38 *
39 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable
40 * specification of an Observer.
41 */
42interface ObservableObserver<T> {
43 /** Callback for the Observable issuing new values.
44 * @param value - The value that the {@link Observable} is sending.
45 */
46 next(value: T): void;
47 /** Callback for the Observable encountering an error, terminating it.
48 * @param error - The error that the {@link Observable} has encountered.
49 */
50 error?(error: any): void;
51 /** Callback for the Observable ending, after all values have been issued. */
52 complete?(): void;
53}
54
55/** A looser definition of ES Observable-like types that is used for interoperability.
56 * @remarks
57 * The Observable is often used by multiple libraries supporting or creating streams to provide
58 * interoperability for push-based streams. When converting from an Observable to a {@link Source},
59 * this looser type is accepted as an input.
60 *
61 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
62 * @see {@link Observable} for the full ES Observable type.
63 */
64interface ObservableLike<T> {
65 /**
66 * Subscribes to new signals from an {@link Observable} via callbacks.
67 * @param observer - An object containing callbacks for the various events of an Observable.
68 * @returns Subscription handle of type {@link ObservableSubscription}.
69 *
70 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
71 * issue events.
72 */
73 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
74
75 /** The well-known symbol specifying the default ES Observable for an object. */
76 [Symbol.observable]?(): Observable<T>;
77}
78
79/** An ES Observable type that is a de-facto standard for push-based data sources across the JS
80 * ecosystem.
81 *
82 * @remarks
83 * The Observable is often used by multiple libraries supporting or creating streams to provide
84 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in
85 * functionality to Observables, it provides utilities to cleanly convert to and from Observables.
86 *
87 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
88 */
89interface Observable<T> {
90 /** Subscribes to new signals from an {@link Observable} via callbacks.
91 * @param observer - An object containing callbacks for the various events of an Observable.
92 * @returns Subscription handle of type {@link ObservableSubscription}.
93 *
94 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
95 * issue events.
96 */
97 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
98
99 /** Subscribes to new signals from an {@link Observable} via callbacks.
100 * @param onNext - Callback for the Observable issuing new values.
101 * @param onError - Callback for the Observable encountering an error, terminating it.
102 * @param onComplete - Callback for the Observable ending, after all values have been issued.
103 * @returns Subscription handle of type {@link ObservableSubscription}.
104 */
105 subscribe(
106 onNext: (value: T) => any,
107 onError?: (error: any) => any,
108 onComplete?: () => any
109 ): ObservableSubscription;
110
111 /** The well-known symbol specifying the default ES Observable for an object. */
112 [Symbol.observable](): Observable<T>;
113}
114
115/** Returns the well-known symbol specifying the default ES Observable.
116 * @privateRemarks
117 * This symbol is used to mark an object as a default ES Observable. By the specification, an object
118 * that abides by the default Observable implementation must carry a method set to this well-known
119 * symbol that returns the Observable implementation. It's common for this object to be an
120 * Observable itself and return itself on this method.
121 *
122 * @see {@link https://github.com/0no-co/wonka/issues/122} for notes on the intercompatibility
123 * between Observable implementations.
124 *
125 * @internal
126 */
127const observableSymbol = (): typeof Symbol.observable => Symbol.observable || '@@observable';
128
129/** Converts an ES Observable to a {@link Source}.
130 * @param input - The {@link ObservableLike} object that will be converted.
131 * @returns A {@link Source} wrapping the passed Observable.
132 *
133 * @remarks
134 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and
135 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through
136 * all of the Observable's values. As such, this utility provides intercompatibility converting from
137 * standard Observables to Wonka Sources.
138 *
139 * @throws
140 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does
141 * not support or expect errors to be handled by streams.
142 */
143export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
144 return sink => {
145 const subscription = (
146 input[observableSymbol()] ? input[observableSymbol()]!() : input
147 ).subscribe({
148 next(value: T) {
149 sink(push(value));
150 },
151 complete() {
152 sink(SignalKind.End);
153 },
154 error(error) {
155 throw error;
156 },
157 });
158 sink(
159 start(signal => {
160 if (signal === TalkbackKind.Close) subscription.unsubscribe();
161 })
162 );
163 };
164}
165
166/** Converts a {@link Source} to an ES Observable.
167 * @param source - The {@link Source} that will be converted.
168 * @returns An {@link Observable} wrapping the passed Source.
169 *
170 * @remarks
171 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it
172 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides
173 * intercompatibility converting from Wonka Sources to standard ES Observables.
174 */
175export function toObservable<T>(source: Source<T>): Observable<T> {
176 return {
177 subscribe(
178 next: ObservableObserver<T> | ((value: T) => any),
179 error?: (error: any) => any | undefined,
180 complete?: () => any | undefined
181 ) {
182 const observer: ObservableObserver<T> =
183 typeof next == 'object' ? next : { next, error, complete };
184 let talkback = talkbackPlaceholder;
185 let ended = false;
186 source(signal => {
187 if (ended) {
188 /*noop*/
189 } else if (signal === SignalKind.End) {
190 ended = true;
191 if (observer.complete) observer.complete();
192 } else if (signal.tag === SignalKind.Start) {
193 (talkback = signal[0])(TalkbackKind.Pull);
194 } else {
195 observer.next(signal[0]);
196 talkback(TalkbackKind.Pull);
197 }
198 });
199 const subscription = {
200 closed: false,
201 unsubscribe() {
202 subscription.closed = true;
203 ended = true;
204 talkback(TalkbackKind.Close);
205 },
206 };
207 return subscription;
208 },
209 [observableSymbol()]() {
210 return this;
211 },
212 };
213}