1import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
2import { push, start, talkbackPlaceholder } from './helpers';
3
4export function fromArray<T>(array: T[]): Source<T> {
5 return sink => {
6 let ended = false;
7 let looping = false;
8 let pulled = false;
9 let current = 0;
10 sink(
11 start(signal => {
12 if (signal === TalkbackKind.Close) {
13 ended = true;
14 } else if (looping) {
15 pulled = true;
16 } else {
17 for (pulled = looping = true; pulled && !ended; current++) {
18 if (current < array.length) {
19 pulled = false;
20 sink(push(array[current]));
21 } else {
22 ended = true;
23 sink(SignalKind.End);
24 }
25 }
26 looping = false;
27 }
28 })
29 );
30 };
31}
32
33export function fromValue<T>(value: T): Source<T> {
34 return sink => {
35 let ended = false;
36 sink(
37 start(signal => {
38 if (signal === TalkbackKind.Close) {
39 ended = true;
40 } else if (!ended) {
41 ended = true;
42 sink(push(value));
43 sink(SignalKind.End);
44 }
45 })
46 );
47 };
48}
49
50export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> {
51 return sink => {
52 let ended = false;
53 const teardown = produce({
54 next(value: T) {
55 if (!ended) sink(push(value));
56 },
57 complete() {
58 if (!ended) {
59 ended = true;
60 sink(SignalKind.End);
61 }
62 },
63 });
64 sink(
65 start(signal => {
66 if (signal === TalkbackKind.Close && !ended) {
67 ended = true;
68 teardown();
69 }
70 })
71 );
72 };
73}
74
75export function makeSubject<T>(): Subject<T> {
76 let sinks: Sink<T>[] = [];
77 let ended = false;
78 return {
79 source(sink: Sink<T>) {
80 sinks.push(sink);
81 sink(
82 start(signal => {
83 if (signal === TalkbackKind.Close) {
84 const index = sinks.indexOf(sink);
85 if (index > -1) (sinks = sinks.slice()).splice(index, 1);
86 }
87 })
88 );
89 },
90 next(value: T) {
91 if (!ended) {
92 const signal = push(value);
93 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
94 }
95 },
96 complete() {
97 if (!ended) {
98 ended = true;
99 for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
100 }
101 },
102 };
103}
104
105export const empty: Source<any> = (sink: Sink<any>): void => {
106 let ended = false;
107 sink(
108 start(signal => {
109 if (signal === TalkbackKind.Close) {
110 ended = true;
111 } else if (!ended) {
112 ended = true;
113 sink(SignalKind.End);
114 }
115 })
116 );
117};
118
119export const never: Source<any> = (sink: Sink<any>): void => {
120 sink(start(talkbackPlaceholder));
121};
122
123export function interval(ms: number): Source<number> {
124 return sink => {
125 let i = 0;
126 const id = setInterval(() => {
127 sink(push(i++));
128 }, ms);
129 sink(
130 start(signal => {
131 if (signal === TalkbackKind.Close) clearInterval(id);
132 })
133 );
134 };
135}
136
137export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
138 return sink => {
139 const handler = (payload: Event) => {
140 sink(push(payload));
141 };
142 sink(
143 start(signal => {
144 if (signal === TalkbackKind.Close) element.removeEventListener(event, handler);
145 })
146 );
147 element.addEventListener(event, handler);
148 };
149}
150
151export function fromPromise<T>(promise: Promise<T>): Source<T> {
152 return sink => {
153 let ended = false;
154 promise.then(value => {
155 if (!ended) {
156 sink(push(value));
157 sink(SignalKind.End);
158 }
159 });
160 sink(
161 start(signal => {
162 if (signal === TalkbackKind.Close) ended = true;
163 })
164 );
165 };
166}