1import { Source, Subscription, TalkbackKind, SignalKind } from './types';
2import { talkbackPlaceholder } from './helpers';
3
4export function subscribe<T>(subscriber: (value: T) => void) {
5 return (source: Source<T>): Subscription => {
6 let talkback = talkbackPlaceholder;
7 let ended = false;
8 source(signal => {
9 if (signal === SignalKind.End) {
10 ended = true;
11 } else if (signal.tag === SignalKind.Start) {
12 (talkback = signal[0])(TalkbackKind.Pull);
13 } else if (!ended) {
14 subscriber(signal[0]);
15 talkback(TalkbackKind.Pull);
16 }
17 });
18 return {
19 unsubscribe() {
20 if (!ended) {
21 ended = true;
22 talkback(TalkbackKind.Close);
23 }
24 },
25 };
26 };
27}
28
29export function forEach<T>(subscriber: (value: T) => void) {
30 return (source: Source<T>): void => {
31 subscribe(subscriber)(source);
32 };
33}
34
35export function publish<T>(source: Source<T>): void {
36 subscribe(_value => {
37 /*noop*/
38 })(source);
39}
40
41export function toArray<T>(source: Source<T>): T[] {
42 const values: T[] = [];
43 let talkback = talkbackPlaceholder;
44 let ended = false;
45 source(signal => {
46 if (signal === SignalKind.End) {
47 ended = true;
48 } else if (signal.tag === SignalKind.Start) {
49 (talkback = signal[0])(TalkbackKind.Pull);
50 } else {
51 values.push(signal[0]);
52 talkback(TalkbackKind.Pull);
53 }
54 });
55 if (!ended) talkback(TalkbackKind.Close);
56 return values;
57}
58
59export function toPromise<T>(source: Source<T>): Promise<T> {
60 return new Promise(resolve => {
61 let talkback = talkbackPlaceholder;
62 let value: T | void;
63 source(signal => {
64 if (signal === SignalKind.End) {
65 resolve(value!);
66 } else if (signal.tag === SignalKind.Start) {
67 (talkback = signal[0])(TalkbackKind.Pull);
68 } else {
69 value = signal[0];
70 talkback(TalkbackKind.Pull);
71 }
72 });
73 });
74}