1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4interface ObservableSubscription {
5 closed?: boolean;
6 unsubscribe(): void;
7}
8
9interface ObservableObserver<T> {
10 next(value: T): void;
11 error(error: any): void;
12 complete(): void;
13}
14
15interface Observable<T> {
16 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
17}
18
19const observableSymbol = (): symbol | string => Symbol.observable || '@@observable';
20
21export function fromObservable<T>(input: Observable<T>): Source<T> {
22 input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input;
23 return sink => {
24 const subscription = input.subscribe({
25 next(value: T) {
26 sink(push(value));
27 },
28 complete() {
29 sink(SignalKind.End);
30 },
31 error() {
32 /*noop*/
33 },
34 });
35 sink(
36 start(signal => {
37 if (signal === TalkbackKind.Close) subscription.unsubscribe();
38 })
39 );
40 };
41}
42
43export function toObservable<T>(source: Source<T>): Observable<T> {
44 return {
45 subscribe(observer: ObservableObserver<T>) {
46 let talkback = talkbackPlaceholder;
47 let ended = false;
48 source(signal => {
49 if (ended) {
50 /*noop*/
51 } else if (signal === SignalKind.End) {
52 ended = true;
53 observer.complete();
54 } else if (signal.tag === SignalKind.Start) {
55 (talkback = signal[0])(TalkbackKind.Pull);
56 } else {
57 observer.next(signal[0]);
58 talkback(TalkbackKind.Pull);
59 }
60 });
61 const subscription = {
62 closed: false,
63 unsubscribe() {
64 subscription.closed = true;
65 ended = true;
66 talkback(TalkbackKind.Close);
67 },
68 };
69 return subscription;
70 },
71 [observableSymbol()]() {
72 return this;
73 },
74 };
75}