1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4interface ObservableSubscription {
5 closed?: boolean;
6 unsubscribe(): void;
7}
8
9interface ObservableObserver<T> {
10 next(value: T): void;
11 error(error: any): void;
12 complete(): void;
13}
14
15interface Observable<T> {
16 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
17}
18
19const observableSymbol: unique symbol =
20 typeof Symbol === 'function'
21 ? (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable'))
22 : '@@observable';
23
24export function fromObservable<T>(input: Observable<T>): Source<T> {
25 const observable: Observable<T> = input[observableSymbol]
26 ? (input as any)[observableSymbol]()
27 : input;
28 return sink => {
29 const subscription = observable.subscribe({
30 next(value: T) {
31 sink(push(value));
32 },
33 complete() {
34 sink(SignalKind.End);
35 },
36 error() {
37 /*noop*/
38 },
39 });
40 sink(
41 start(signal => {
42 if (signal === TalkbackKind.Close) subscription.unsubscribe();
43 })
44 );
45 };
46}
47
48export function toObservable<T>(source: Source<T>): Observable<T> {
49 const observable: Observable<T> = {
50 subscribe(observer: ObservableObserver<T>) {
51 let talkback = talkbackPlaceholder;
52 let ended = false;
53 source(signal => {
54 if (ended) {
55 /*noop*/
56 } else if (signal === SignalKind.End) {
57 ended = true;
58 observer.complete();
59 } else if (signal.tag === SignalKind.Start) {
60 (talkback = signal[0])(TalkbackKind.Pull);
61 } else {
62 observer.next(signal[0]);
63 talkback(TalkbackKind.Pull);
64 }
65 });
66 const subscription = {
67 closed: false,
68 unsubscribe() {
69 subscription.closed = true;
70 ended = true;
71 talkback(TalkbackKind.Close);
72 },
73 };
74 return subscription;
75 },
76 };
77 observable[observableSymbol] = () => observable;
78 return observable;
79}