1import { Source, SignalKind, TalkbackKind } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4interface ObservableSubscription {
5 closed?: boolean;
6 unsubscribe(): void;
7}
8
9interface ObservableObserver<T> {
10 next(value: T): void;
11 error(error: any): void;
12 complete(): void;
13}
14
15interface Observable<T> {
16 subscribe(observer: ObservableObserver<T>): ObservableSubscription;
17}
18
19const observableSymbol = (): symbol =>
20 (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable'));
21
22export function fromObservable<T>(input: Observable<T>): Source<T> {
23 input = input[observableSymbol()] ? (input as any)[observableSymbol()]() : input;
24 return sink => {
25 const subscription = input.subscribe({
26 next(value: T) {
27 sink(push(value));
28 },
29 complete() {
30 sink(SignalKind.End);
31 },
32 error() {
33 /*noop*/
34 },
35 });
36 sink(
37 start(signal => {
38 if (signal === TalkbackKind.Close) subscription.unsubscribe();
39 })
40 );
41 };
42}
43
44export function toObservable<T>(source: Source<T>): Observable<T> {
45 return {
46 subscribe(observer: ObservableObserver<T>) {
47 let talkback = talkbackPlaceholder;
48 let ended = false;
49 source(signal => {
50 if (ended) {
51 /*noop*/
52 } else if (signal === SignalKind.End) {
53 ended = true;
54 observer.complete();
55 } else if (signal.tag === SignalKind.Start) {
56 (talkback = signal[0])(TalkbackKind.Pull);
57 } else {
58 observer.next(signal[0]);
59 talkback(TalkbackKind.Pull);
60 }
61 });
62 const subscription = {
63 closed: false,
64 unsubscribe() {
65 subscription.closed = true;
66 ended = true;
67 talkback(TalkbackKind.Close);
68 },
69 };
70 return subscription;
71 },
72 [observableSymbol()]() {
73 return this;
74 },
75 };
76}