1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder, observableSymbol } from './helpers';
3
4/** A definition of the ES Observable Subscription type that is returned by
5 * {@link Observable.subscribe}
6 *
7 * @remarks
8 * The Subscription in ES Observables is a handle that is held while the Observable is actively
9 * streaming values. As such, it's used to indicate with {@link ObservableSubscription.closed}
10 * whether it's active, and {@link ObservableSubscription.unsubscribe} may be used to cancel the
11 * ongoing subscription and end the {@link Observable} early.
12 *
13 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
14 */
15interface ObservableSubscription {
16 /** A boolean flag indicating whether the subscription is closed.
17 * @remarks
18 * When `true`, the subscription will not issue new values to the {@link ObservableObserver} and
19 * has terminated. No new values are expected.
20 *
21 * @readonly
22 */
23 closed: boolean;
24 /** Cancels the subscription.
25 * @remarks
26 * This cancels the ongoing subscription and the {@link ObservableObserver}'s callbacks will
27 * subsequently not be called at all. The subscription will be terminated and become inactive.
28 */
29 unsubscribe(): void;
30}
31
32/** A definition of the ES Observable Observer type that is used to receive data from an
33 * {@link Observable}.
34 *
35 * @remarks
36 * The Observer in ES Observables is supplied to {@link Observable.subscribe} to receive events from
37 * an {@link Observable} as it issues them.
38 *
39 * @see {@link https://github.com/tc39/proposal-observable#observer} for the ES Observable
40 * specification of an Observer.
41 */
42interface ObservableObserver<T> {
43 /** Callback for the Observable issuing new values.
44 * @param value - The value that the {@link Observable} is sending.
45 */
46 next(value: T): void;
47 /** Callback for the Observable encountering an error, terminating it.
48 * @param error - The error that the {@link Observable} has encountered.
49 */
50 error?(error: any): void;
51 /** Callback for the Observable ending, after all values have been issued. */
52 complete?(): void;
53}
54
55/** A looser definition of ES Observable-like types that is used for interoperability.
56 * @remarks
57 * The Observable is often used by multiple libraries supporting or creating streams to provide
58 * interoperability for push-based streams. When converting from an Observable to a {@link Source},
59 * this looser type is accepted as an input.
60 *
61 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
62 * @see {@link Observable} for the full ES Observable type.
63 */
64interface ObservableLike<T> {
65 /**
66 * Subscribes to new signals from an {@link Observable} via callbacks.
67 * @param observer - An object containing callbacks for the various events of an Observable.
68 * @returns Subscription handle of type {@link ObservableSubscription}.
69 *
70 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
71 * issue events.
72 */
73 subscribe(observer: ObservableObserver<T>): { unsubscribe(): void };
74
75 /** The well-known symbol specifying the default ES Observable for an object. */
76 [Symbol.observable]?(): Observable<T>;
77}
78
79/** An ES Observable type that is a de-facto standard for push-based data sources across the JS
80 * ecosystem.
81 *
82 * @remarks
83 * The Observable is often used by multiple libraries supporting or creating streams to provide
84 * interoperability for push-based streams. As Wonka's {@link Source | Sources} are similar in
85 * functionality to Observables, it provides utilities to cleanly convert to and from Observables.
86 *
87 * @see {@link https://github.com/tc39/proposal-observable} for the ES Observable specification.
88 */
89interface Observable<T> {
90 /** Subscribes to new signals from an {@link Observable} via callbacks.
91 * @param observer - An object containing callbacks for the various events of an Observable.
92 * @returns Subscription handle of type {@link ObservableSubscription}.
93 *
94 * @see {@link ObservableObserver} for the callbacks in an object that are called as Observables
95 * issue events.
96 */
97 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
98
99 /** Subscribes to new signals from an {@link Observable} via callbacks.
100 * @param onNext - Callback for the Observable issuing new values.
101 * @param onError - Callback for the Observable encountering an error, terminating it.
102 * @param onComplete - Callback for the Observable ending, after all values have been issued.
103 * @returns Subscription handle of type {@link ObservableSubscription}.
104 */
105 subscribe(
106 onNext: (value: T) => any,
107 onError?: (error: any) => any,
108 onComplete?: () => any
109 ): ObservableSubscription;
110
111 /** The well-known symbol specifying the default ES Observable for an object. */
112 [Symbol.observable](): Observable<T>;
113}
114
115/** Converts an ES Observable to a {@link Source}.
116 * @param input - The {@link ObservableLike} object that will be converted.
117 * @returns A {@link Source} wrapping the passed Observable.
118 *
119 * @remarks
120 * This converts an ES Observable to a {@link Source}. When this Source receives a {@link Sink} and
121 * the subscription starts, internally, it'll subscribe to the passed Observable, passing through
122 * all of the Observable's values. As such, this utility provides intercompatibility converting from
123 * standard Observables to Wonka Sources.
124 *
125 * @throws
126 * When the passed ES Observable throws, the error is simply re-thrown as {@link Source} does
127 * not support or expect errors to be handled by streams.
128 */
129export function fromObservable<T>(input: ObservableLike<T>): Source<T> {
130 return sink => {
131 const subscription = (
132 input[observableSymbol()] ? input[observableSymbol()]!() : input
133 ).subscribe({
134 next(value: T) {
135 sink(push(value));
136 },
137 complete() {
138 sink(SignalKind.End);
139 },
140 error(error) {
141 throw error;
142 },
143 });
144 sink(
145 start(signal => {
146 if (signal === TalkbackKind.Close) subscription.unsubscribe();
147 })
148 );
149 };
150}
151
152/** Converts a {@link Source} to an ES Observable.
153 * @param source - The {@link Source} that will be converted.
154 * @returns An {@link Observable} wrapping the passed Source.
155 *
156 * @remarks
157 * This converts a {@link Source} to an {@link Observable}. When this Observable is subscribed to, it
158 * internally subscribes to the Wonka Source and pulls new values. As such, this utility provides
159 * intercompatibility converting from Wonka Sources to standard ES Observables.
160 */
161export function toObservable<T>(source: Source<T>): Observable<T> {
162 return {
163 subscribe(
164 next: ObservableObserver<T> | ((value: T) => any),
165 error?: (error: any) => any | undefined,
166 complete?: () => any | undefined
167 ) {
168 const observer: ObservableObserver<T> =
169 typeof next == 'object' ? next : { next, error, complete };
170 let talkback = talkbackPlaceholder;
171 let ended = false;
172 source(signal => {
173 if (ended) {
174 /*noop*/
175 } else if (signal === SignalKind.End) {
176 ended = true;
177 if (observer.complete) observer.complete();
178 } else if (signal.tag === SignalKind.Start) {
179 (talkback = signal[0])(TalkbackKind.Pull);
180 } else {
181 observer.next(signal[0]);
182 talkback(TalkbackKind.Pull);
183 }
184 });
185 const subscription = {
186 closed: false,
187 unsubscribe() {
188 subscription.closed = true;
189 ended = true;
190 talkback(TalkbackKind.Close);
191 },
192 };
193 return subscription;
194 },
195 [observableSymbol()]() {
196 return this;
197 },
198 };
199}