1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
2import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
3import { share } from './operators';
4
5export function fromArray<T>(array: T[]): Source<T> {
6 return sink => {
7 let ended = false;
8 let looping = false;
9 let pulled = false;
10 let current = 0;
11 sink(
12 start(signal => {
13 if (signal === TalkbackKind.Close) {
14 ended = true;
15 } else if (looping) {
16 pulled = true;
17 } else {
18 for (pulled = looping = true; pulled && !ended; current++) {
19 if (current < array.length) {
20 pulled = false;
21 sink(push(array[current]));
22 } else {
23 ended = true;
24 sink(SignalKind.End);
25 }
26 }
27 looping = false;
28 }
29 })
30 );
31 };
32}
33
34export function fromValue<T>(value: T): Source<T> {
35 return sink => {
36 let ended = false;
37 sink(
38 start(signal => {
39 if (signal === TalkbackKind.Close) {
40 ended = true;
41 } else if (!ended) {
42 ended = true;
43 sink(push(value));
44 sink(SignalKind.End);
45 }
46 })
47 );
48 };
49}
50
51export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> {
52 return sink => {
53 let ended = false;
54 const teardown = produce({
55 next(value: T) {
56 if (!ended) sink(push(value));
57 },
58 complete() {
59 if (!ended) {
60 ended = true;
61 sink(SignalKind.End);
62 }
63 },
64 });
65 sink(
66 start(signal => {
67 if (signal === TalkbackKind.Close && !ended) {
68 ended = true;
69 teardown();
70 }
71 })
72 );
73 };
74}
75
76export function makeSubject<T>(): Subject<T> {
77 let next: Subject<T>['next'] | void;
78 let complete: Subject<T>['complete'] | void;
79 return {
80 source: share(
81 make(observer => {
82 next = observer.next;
83 complete = observer.complete;
84 return teardownPlaceholder;
85 })
86 ),
87 next(value: T) {
88 if (next) next(value);
89 },
90 complete() {
91 if (complete) complete();
92 },
93 };
94}
95
96export const empty: Source<any> = (sink: Sink<any>): void => {
97 let ended = false;
98 sink(
99 start(signal => {
100 if (signal === TalkbackKind.Close) {
101 ended = true;
102 } else if (!ended) {
103 ended = true;
104 sink(SignalKind.End);
105 }
106 })
107 );
108};
109
110export const never: Source<any> = (sink: Sink<any>): void => {
111 sink(start(talkbackPlaceholder));
112};
113
114export function interval(ms: number): Source<number> {
115 return make(observer => {
116 let i = 0;
117 const id = setInterval(() => observer.next(i++), ms);
118 return () => clearInterval(id);
119 });
120}
121
122export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
123 return make(observer => {
124 element.addEventListener(event, observer.next);
125 return () => element.removeEventListener(event, observer.next);
126 });
127}
128
129export function fromPromise<T>(promise: Promise<T>): Source<T> {
130 return make(observer => {
131 promise.then(value => {
132 Promise.resolve(value).then(() => {
133 observer.next(value);
134 observer.complete();
135 });
136 });
137 return teardownPlaceholder;
138 });
139}