Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

chore: Derive common Wonka sources from make source (#114)

* Replace makeSubject with make derivative

* Reuse make for common sources

Changed files
+29 -55
src
+1
src/__tests__/sources.test.ts
···
expect(signals).toEqual([start(expect.any(Function))]);
await promise;
+
await Promise.resolve();
expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
});
+28 -55
src/sources.ts
···
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
-
import { push, start, talkbackPlaceholder } from './helpers';
+
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
+
import { share } from './operators';
export function fromArray<T>(array: T[]): Source<T> {
return sink => {
···
}
export function makeSubject<T>(): Subject<T> {
-
let sinks: Sink<T>[] = [];
-
let ended = false;
+
let next: Subject<T>['next'] | void;
+
let complete: Subject<T>['complete'] | void;
return {
-
source(sink: Sink<T>) {
-
sinks.push(sink);
-
sink(
-
start(signal => {
-
if (signal === TalkbackKind.Close) {
-
const index = sinks.indexOf(sink);
-
if (index > -1) (sinks = sinks.slice()).splice(index, 1);
-
}
-
})
-
);
-
},
+
source: share(
+
make(observer => {
+
next = observer.next;
+
complete = observer.complete;
+
return teardownPlaceholder;
+
})
+
),
next(value: T) {
-
if (!ended) {
-
const signal = push(value);
-
for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
-
}
+
if (next) next(value);
},
complete() {
-
if (!ended) {
-
ended = true;
-
for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
-
}
+
if (complete) complete();
},
};
}
···
};
export function interval(ms: number): Source<number> {
-
return sink => {
+
return make(observer => {
let i = 0;
-
const id = setInterval(() => {
-
sink(push(i++));
-
}, ms);
-
sink(
-
start(signal => {
-
if (signal === TalkbackKind.Close) clearInterval(id);
-
})
-
);
-
};
+
const id = setInterval(() => observer.next(i++), ms);
+
return () => clearInterval(id);
+
});
}
export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
-
return sink => {
-
const handler = (payload: Event) => {
-
sink(push(payload));
-
};
-
sink(
-
start(signal => {
-
if (signal === TalkbackKind.Close) element.removeEventListener(event, handler);
-
})
-
);
-
element.addEventListener(event, handler);
-
};
+
return make(observer => {
+
element.addEventListener(event, observer.next);
+
return () => element.removeEventListener(event, observer.next);
+
});
}
export function fromPromise<T>(promise: Promise<T>): Source<T> {
-
return sink => {
-
let ended = false;
+
return make(observer => {
promise.then(value => {
-
if (!ended) {
-
sink(push(value));
-
sink(SignalKind.End);
-
}
+
Promise.resolve(value).then(() => {
+
observer.next(value);
+
observer.complete();
+
});
});
-
sink(
-
start(signal => {
-
if (signal === TalkbackKind.Close) ended = true;
-
})
-
);
-
};
+
return teardownPlaceholder;
+
});
}