Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

feat: Implement fromIterable and replace fromArray (#115)

* Replace array-only implementation with iterable

* Add fromAsyncIterable

* Add wonka@4 to suite

* Rename factory to lazy

* Combine functions for ease of use

* Split fromIterable/fromAsyncIterable back out

That's simply less bytes somehow

* Fix fromPromise test

Changed files
+83 -7
perf
src
+1
perf/package.json
···
"benchr": "^4.3.0"
},
"dependencies": {
+
"wonka-v4": "npm:wonka@^4.0.0",
"most": "^1.7.3",
"rxjs": "^6.3.3"
}
+11
perf/suite.js
···
const Wonka = require('..');
+
const Wonka4 = require('wonka-v4');
const Rx = require('rxjs');
const RxOperators = require('rxjs/operators');
const most = require('most');
···
Wonka.filter(x => x > 4),
Wonka.scan((acc, x) => acc + x, 0),
Wonka.toPromise
+
);
+
});
+
+
benchmark('Wonka v4', () => {
+
return Wonka4.pipe(
+
Wonka4.fromArray(input),
+
Wonka4.map(x => x * 2),
+
Wonka4.filter(x => x > 4),
+
Wonka4.scan((acc, x) => acc + x, 0),
+
Wonka4.toPromise
);
});
+5
perf/yarn.lock
···
integrity sha1-8LDc+RW8X/FSivrbLA4XtTLaL+g=
dependencies:
defaults "^1.0.3"
+
+
"wonka-v4@npm:wonka@^4.0.0":
+
version "4.0.15"
+
resolved "https://registry.yarnpkg.com/wonka/-/wonka-4.0.15.tgz#9aa42046efa424565ab8f8f451fcca955bf80b89"
+
integrity sha512-U0IUQHKXXn6PFo9nqsHphVCE5m3IntqZNB9Jjn7EB1lrR7YTDY3YWgFvEvwniTzXSvOH/XMzAZaIfJF/LvHYXg==
+1
src/__tests__/sources.test.ts
···
expect(signals).toEqual([start(expect.any(Function))]);
+
await Promise.resolve();
await promise;
await Promise.resolve();
+65 -7
src/sources.ts
···
import { push, start, talkbackPlaceholder, teardownPlaceholder } from './helpers';
import { share } from './operators';
-
export function fromArray<T>(array: T[]): Source<T> {
+
export function lazy<T>(make: () => Source<T>): Source<T> {
+
return sink => make()(sink);
+
}
+
+
export function fromAsyncIterable<T>(iterable: AsyncIterable<T>): Source<T> {
return sink => {
+
const iterator = iterable[Symbol.asyncIterator]();
let ended = false;
let looping = false;
let pulled = false;
-
let current = 0;
+
let next: IteratorResult<T>;
sink(
-
start(signal => {
+
start(async signal => {
if (signal === TalkbackKind.Close) {
ended = true;
+
if (iterator.return) iterator.return();
} else if (looping) {
pulled = true;
} else {
-
for (pulled = looping = true; pulled && !ended; current++) {
-
if (current < array.length) {
-
pulled = false;
-
sink(push(array[current]));
+
for (pulled = looping = true; pulled && !ended; ) {
+
if ((next = await iterator.next()).done) {
+
ended = true;
+
if (iterator.return) await iterator.return();
+
sink(SignalKind.End);
} else {
+
try {
+
pulled = false;
+
sink(push(next.value));
+
} catch (error) {
+
if (iterator.throw) {
+
if ((ended = !!(await iterator.throw(error)).done)) sink(SignalKind.End);
+
} else {
+
throw error;
+
}
+
}
+
}
+
}
+
looping = false;
+
}
+
})
+
);
+
};
+
}
+
+
export function fromIterable<T>(iterable: Iterable<T> | AsyncIterable<T>): Source<T> {
+
if (iterable[Symbol.asyncIterator]) return fromAsyncIterable(iterable as AsyncIterable<T>);
+
return sink => {
+
const iterator = iterable[Symbol.iterator]();
+
let ended = false;
+
let looping = false;
+
let pulled = false;
+
let next: IteratorResult<T>;
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
ended = true;
+
if (iterator.return) iterator.return();
+
} else if (looping) {
+
pulled = true;
+
} else {
+
for (pulled = looping = true; pulled && !ended; ) {
+
if ((next = iterator.next()).done) {
ended = true;
+
if (iterator.return) iterator.return();
sink(SignalKind.End);
+
} else {
+
try {
+
pulled = false;
+
sink(push(next.value));
+
} catch (error) {
+
if (iterator.throw) {
+
if ((ended = !!iterator.throw(error).done)) sink(SignalKind.End);
+
} else {
+
throw error;
+
}
+
}
}
}
looping = false;
···
);
};
}
+
+
export const fromArray: <T>(array: T[]) => Source<T> = fromIterable;
export function fromValue<T>(value: T): Source<T> {
return sink => {