1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
2import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
3import { share } from './operators';
4
5export function lazy<T>(make: () => Source<T>): Source<T> {
6 return sink => make()(sink);
7}
8
9export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
10 return sink => {
11 const iterator = iterable[Symbol.asyncIterator]();
12 let ended = false;
13 let looping = false;
14 let pulled = false;
15 let next: IteratorResult<T>;
16 sink(
17 start(async signal => {
18 if (signal === TalkbackKind.Close) {
19 ended = true;
20 if (iterator.return) iterator.return();
21 } else if (looping) {
22 pulled = true;
23 } else {
24 for (pulled = looping = true; pulled && !ended; ) {
25 if ((next = await iterator.next()).done) {
26 ended = true;
27 if (iterator.return) await iterator.return();
28 sink(SignalKind.End);
29 } else {
30 try {
31 pulled = false;
32 sink(push(next.value));
33 } catch (error) {
34 if (iterator.throw) {
35 if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End);
36 } else {
37 throw error;
38 }
39 }
40 }
41 }
42 looping = false;
43 }
44 })
45 );
46 };
47}
48
49export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {
50 if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>);
51 return sink => {
52 const iterator = iterable[Symbol.iterator]();
53 let ended = false;
54 let looping = false;
55 let pulled = false;
56 let next: IteratorResult<T>;
57 sink(
58 start(signal => {
59 if (signal === TalkbackKind.Close) {
60 ended = true;
61 if (iterator.return) iterator.return();
62 } else if (looping) {
63 pulled = true;
64 } else {
65 for (pulled = looping = true; pulled && !ended; ) {
66 if ((next = iterator.next()).done) {
67 ended = true;
68 if (iterator.return) iterator.return();
69 sink(SignalKind.End);
70 } else {
71 try {
72 pulled = false;
73 sink(push(next.value));
74 } catch (error) {
75 if (iterator.throw) {
76 if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End);
77 } else {
78 throw error;
79 }
80 }
81 }
82 }
83 looping = false;
84 }
85 })
86 );
87 };
88}
89
90export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
91
92export function fromValue<T>(value: T): Source<T> {
93 return sink => {
94 let ended = false;
95 sink(
96 start(signal => {
97 if (signal === TalkbackKind.Close) {
98 ended = true;
99 } else if (!ended) {
100 ended = true;
101 sink(push(value));
102 sink(SignalKind.End);
103 }
104 })
105 );
106 };
107}
108
109export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> {
110 return sink => {
111 let ended = false;
112 const teardown = produce({
113 next(value: T) {
114 if (!ended) sink(push(value));
115 },
116 complete() {
117 if (!ended) {
118 ended = true;
119 sink(SignalKind.End);
120 }
121 },
122 });
123 sink(
124 start(signal => {
125 if (signal === TalkbackKind.Close && !ended) {
126 ended = true;
127 teardown();
128 }
129 })
130 );
131 };
132}
133
134export function makeSubject<T>(): Subject<T> {
135 let next: Subject<T>['next'] | void;
136 let complete: Subject<T>['complete'] | void;
137 return {
138 source: share(
139 make(observer => {
140 next = observer.next;
141 complete = observer.complete;
142 return teardownPlaceholder;
143 })
144 ),
145 next(value: T) {
146 if (next) next(value);
147 },
148 complete() {
149 if (complete) complete();
150 },
151 };
152}
153
154export const empty: Source<any> = (sink: Sink<any>): void => {
155 let ended = false;
156 sink(
157 start(signal => {
158 if (signal === TalkbackKind.Close) {
159 ended = true;
160 } else if (!ended) {
161 ended = true;
162 sink(SignalKind.End);
163 }
164 })
165 );
166};
167
168export const never: Source<any> = (sink: Sink<any>): void => {
169 sink(start(talkbackPlaceholder));
170};
171
172export function interval(ms: number): Source<number> {
173 return make(observer => {
174 let i = 0;
175 const id = setInterval(() => observer.next(i++), ms);
176 return () => clearInterval(id);
177 });
178}
179
180export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
181 return make(observer => {
182 element.addEventListener(event, observer.next);
183 return () => element.removeEventListener(event, observer.next);
184 });
185}
186
187export function fromPromise<T>(promise: Promise<T>): Source<T> {
188 return make(observer => {
189 promise.then(value => {
190 Promise.resolve(value).then(() => {
191 observer.next(value);
192 observer.complete();
193 });
194 });
195 return teardownPlaceholder;
196 });
197}