import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'; import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers'; import { share } from './operators'; export function lazy(make: () => Source): Source { return sink => make()(sink); } export function fromAsyncIterable(iterable: AsyncIterable): Source { return sink => { const iterator = iterable[Symbol.asyncIterator](); let ended = false; let looping = false; let pulled = false; let next: IteratorResult; sink( start(async signal => { if (signal === TalkbackKind.Close) { ended = true; if (iterator.return) iterator.return(); } else if (looping) { pulled = true; } else { for (pulled = looping = true; pulled && !ended; ) { if ((next = await iterator.next()).done) { ended = true; if (iterator.return) await iterator.return(); sink(SignalKind.End); } else { try { pulled = false; sink(push(next.value)); } catch (error) { if (iterator.throw) { if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End); } else { throw error; } } } } looping = false; } }) ); }; } export function fromIterable(iterable: Iterable | AsyncIterable): Source { if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable); return sink => { const iterator = iterable[Symbol.iterator](); let ended = false; let looping = false; let pulled = false; let next: IteratorResult; sink( start(signal => { if (signal === TalkbackKind.Close) { ended = true; if (iterator.return) iterator.return(); } else if (looping) { pulled = true; } else { for (pulled = looping = true; pulled && !ended; ) { if ((next = iterator.next()).done) { ended = true; if (iterator.return) iterator.return(); sink(SignalKind.End); } else { try { pulled = false; sink(push(next.value)); } catch (error) { if (iterator.throw) { if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End); } else { throw error; } } } } looping = false; } }) ); }; } export const fromArray: (array: T[]) => Source = fromIterable; export function fromValue(value: T): Source { return sink => { let ended = false; sink( start(signal => { if (signal === TalkbackKind.Close) { ended = true; } else if (!ended) { ended = true; sink(push(value)); sink(SignalKind.End); } }) ); }; } export function make(produce: (observer: Observer) => TeardownFn): Source { return sink => { let ended = false; const teardown = produce({ next(value: T) { if (!ended) sink(push(value)); }, complete() { if (!ended) { ended = true; sink(SignalKind.End); } }, }); sink( start(signal => { if (signal === TalkbackKind.Close && !ended) { ended = true; teardown(); } }) ); }; } export function makeSubject(): Subject { let next: Subject['next'] | void; let complete: Subject['complete'] | void; return { source: share( make(observer => { next = observer.next; complete = observer.complete; return teardownPlaceholder; }) ), next(value: T) { if (next) next(value); }, complete() { if (complete) complete(); }, }; } export const empty: Source = (sink: Sink): void => { let ended = false; sink( start(signal => { if (signal === TalkbackKind.Close) { ended = true; } else if (!ended) { ended = true; sink(SignalKind.End); } }) ); }; export const never: Source = (sink: Sink): void => { sink(start(talkbackPlaceholder)); }; export function interval(ms: number): Source { return make(observer => { let i = 0; const id = setInterval(() => observer.next(i++), ms); return () => clearInterval(id); }); } export function fromDomEvent(element: HTMLElement, event: string): Source { return make(observer => { element.addEventListener(event, observer.next); return () => element.removeEventListener(event, observer.next); }); } export function fromPromise(promise: Promise): Source { return make(observer => { promise.then(value => { Promise.resolve(value).then(() => { observer.next(value); observer.complete(); }); }); return teardownPlaceholder; }); }