1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder, observableSymbol } from './helpers';
3
4// NOTE: This must be placed in an exported file for `rollup-plugin-dts`
5// to include it in output typings files
6declare global {
7 interface SymbolConstructor {
8 readonly observable: symbol;
9 }
10}
11
12/** A definition of the ES Observable Subscription type that is returned by
13 * {@link Observable.subscribe}
14 *
15 * @remarks
16 * The Subscription in ES Observables is a handle that is held while the Observable is actively
17 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed}
18 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the
19 * ongoing subscription and end the {@link Observable} early.
20 *
21 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
22 */
23interface ObservableSubscription {
24 /** A boolean flag indicating whether the subscription is closed.
25 * @remarks
26 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and
27 * has terminated. No new values are expected.
28 *
29 * @readonly
30 */
31 closed: boolean;
32 /** Cancels the subscription.
33 * @remarks
34 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will
35 * subsequently not be called at all. The subscription will be terminated and become inactive.
36 */
37 unsubscribe(): void;
38}
39
40/** A definition of the ES Observable Observer type that is used to receive data from an
41 * {@link Observable}.
42 *
43 * @remarks
44 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from
45 * an {@link Observable} as it issues them.
46 *
47 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable
48 * specification of an Observer.
49 */
50interface ObservableObserver<T> {
51 /** Callback for the Observable issuing new values.
52 * @param value - The value that the {@link Observable} is sending.
53 */
54 next(value: T): void;
55 /** Callback for the Observable encountering an error, terminating it.
56 * @param error - The error that the {@link Observable} has encountered.
57 */
58 error?(error: any): void;
59 /** Callback for the Observable ending, after all values have been issued. */
60 complete?(): void;
61}
62
63/** A looser definition of ES Observable-like types that is used for interoperability.
64 * @remarks
65 * The Observable is often used by multiple libraries supporting or creating streams to provide
66 * interoperability for push-based streams. When converting from an Observable to a {@link Source},
67 * this looser type is accepted as an input.
68 *
69 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
70 * @see {@link Observable} for the full ES Observable type.
71 */
72interface ObservableLike<T> {
73 /**
74 * Subscribes to new signals from an {@link Observable} via callbacks.
75 * @param observer - An object containing callbacks for the various events of an Observable.
76 * @returns Subscription handle of type {@link ObservableSubscription}.
77 *
78 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
79 * issue events.
80 */
81 subscribe(observer: ObservableObserver<T>): { unsubscribe(): void };
82
83 /** The well-known symbol specifying the default ES Observable for an object. */
84 [Symbol.observable]?(): Observable<T>;
85}
86
87/** An ES Observable type that is a de-facto standard for push-based data sources across the JS
88 * ecosystem.
89 *
90 * @remarks
91 * The Observable is often used by multiple libraries supporting or creating streams to provide
92 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in
93 * functionality to Observables, it provides utilities to cleanly convert to and from Observables.
94 *
95 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
96 */
97interface Observable<T> {
98 /** Subscribes to new signals from an {@link Observable} via callbacks.
99 * @param observer - An object containing callbacks for the various events of an Observable.
100 * @returns Subscription handle of type {@link ObservableSubscription}.
101 *
102 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
103 * issue events.
104 */
105 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
106
107 /** Subscribes to new signals from an {@link Observable} via callbacks.
108 * @param onNext - Callback for the Observable issuing new values.
109 * @param onError - Callback for the Observable encountering an error, terminating it.
110 * @param onComplete - Callback for the Observable ending, after all values have been issued.
111 * @returns Subscription handle of type {@link ObservableSubscription}.
112 */
113 subscribe(
114 onNext: (value: T) => any,
115 onError?: (error: any) => any,
116 onComplete?: () => any
117 ): ObservableSubscription;
118
119 /** The well-known symbol specifying the default ES Observable for an object. */
120 [Symbol.observable](): Observable<T>;
121}
122
123/** Converts an ES Observable to a {@link Source}.
124 * @param input - The {@link ObservableLike} object that will be converted.
125 * @returns A {@link Source} wrapping the passed Observable.
126 *
127 * @remarks
128 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and
129 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through
130 * all of the Observable's values. As such, this utility provides intercompatibility converting from
131 * standard Observables to Wonka Sources.
132 *
133 * @throws
134 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does
135 * not support or expect errors to be handled by streams.
136 */
137export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
138 return sink => {
139 const subscription = (
140 input[observableSymbol()] ? input[observableSymbol()]!() : input
141 ).subscribe({
142 next(value: T) {
143 sink(push(value));
144 },
145 complete() {
146 sink(SignalKind.End);
147 },
148 error(error) {
149 throw error;
150 },
151 });
152 sink(
153 start(signal => {
154 if (signal === TalkbackKind.Close) subscription.unsubscribe();
155 })
156 );
157 };
158}
159
160/** Converts a {@link Source} to an ES Observable.
161 * @param source - The {@link Source} that will be converted.
162 * @returns An {@link Observable} wrapping the passed Source.
163 *
164 * @remarks
165 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it
166 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides
167 * intercompatibility converting from Wonka Sources to standard ES Observables.
168 */
169export function toObservable<T>(source: Source<T>): Observable<T> {
170 return {
171 subscribe(
172 next: ObservableObserver<T> | ((value: T) => any),
173 error?: (error: any) => any | undefined,
174 complete?: () => any | undefined
175 ) {
176 const observer: ObservableObserver<T> =
177 typeof next == 'object' ? next : { next, error, complete };
178 let talkback = talkbackPlaceholder;
179 let ended = false;
180 source(signal => {
181 if (ended) {
182 /*noop*/
183 } else if (signal === SignalKind.End) {
184 ended = true;
185 if (observer.complete) observer.complete();
186 } else if (signal.tag === SignalKind.Start) {
187 (talkback = signal[0])(TalkbackKind.Pull);
188 } else {
189 observer.next(signal[0]);
190 talkback(TalkbackKind.Pull);
191 }
192 });
193 const subscription = {
194 closed: false,
195 unsubscribe() {
196 subscription.closed = true;
197 ended = true;
198 talkback(TalkbackKind.Close);
199 },
200 };
201 return subscription;
202 },
203 [observableSymbol()]() {
204 return this;
205 },
206 };
207}