Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

feat: Implement toAsyncIterable (#133)

Co-authored-by: Gustav Tiger <gustav.tiger@axis.com>

Changed files
+211
.changeset
src
+5
.changeset/beige-worms-help.md
···
+
---
+
'wonka': minor
+
---
+
+
Implement `toAsyncIterable`, converting a Wonka source to a JS Async Iterable.
+160
src/__tests__/sinks.test.ts
···
});
});
+
describe('toAsyncIterable', () => {
+
it('creates an async iterable mirroring the Wonka source', async () => {
+
let pulls = 0;
+
let sink: Sink<any> | null = null;
+
+
const source: Source<any> = _sink => {
+
sink = _sink;
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
+
};
+
+
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
+
expect(pulls).toBe(1);
+
sink!(push(0));
+
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
+
expect(pulls).toBe(2);
+
+
sink!(push(1));
+
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
+
expect(pulls).toBe(3);
+
+
sink!(SignalKind.End);
+
expect(await asyncIterator.next()).toEqual({ done: true });
+
expect(pulls).toBe(3);
+
});
+
+
it('buffers actively pushed values', async () => {
+
let pulls = 0;
+
let sink: Sink<any> | null = null;
+
+
const source: Source<any> = _sink => {
+
sink = _sink;
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
+
};
+
+
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
+
sink!(push(0));
+
sink!(push(1));
+
sink!(SignalKind.End);
+
+
expect(pulls).toBe(1);
+
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
+
expect(await asyncIterator.next()).toEqual({ value: 1, done: false });
+
expect(await asyncIterator.next()).toEqual({ done: true });
+
});
+
+
it('asynchronously waits for pulled values', async () => {
+
let pulls = 0;
+
let sink: Sink<any> | null = null;
+
+
const source: Source<any> = _sink => {
+
sink = _sink;
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
+
};
+
+
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
expect(pulls).toBe(1);
+
+
let resolved = false;
+
+
const promise = asyncIterator.next().then(value => {
+
resolved = true;
+
return value;
+
});
+
+
await Promise.resolve();
+
expect(resolved).toBe(false);
+
+
sink!(push(0));
+
sink!(SignalKind.End);
+
expect(await promise).toEqual({ value: 0, done: false });
+
expect(await asyncIterator.next()).toEqual({ done: true });
+
});
+
+
it('supports cancellation via return', async () => {
+
let ended = false;
+
let sink: Sink<any> | null = null;
+
+
const source: Source<any> = _sink => {
+
sink = _sink;
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) ended = true;
+
})
+
);
+
};
+
+
const asyncIterator = sinks.toAsyncIterable(source)[Symbol.asyncIterator]();
+
+
sink!(push(0));
+
expect(await asyncIterator.next()).toEqual({ value: 0, done: false });
+
expect(await asyncIterator.return!()).toEqual({ done: true });
+
+
sink!(push(1));
+
expect(await asyncIterator.next()).toEqual({ done: true });
+
+
expect(ended).toBeTruthy();
+
});
+
+
it('supports for-await-of', async () => {
+
let pulls = 0;
+
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
sink(pulls < 3 ? push(pulls++) : SignalKind.End);
+
}
+
})
+
);
+
};
+
+
const iterable = sinks.toAsyncIterable(source);
+
const values: any[] = [];
+
for await (const value of iterable) {
+
values.push(value);
+
}
+
+
expect(values).toEqual([0, 1, 2]);
+
});
+
+
it('supports for-await-of with early break', async () => {
+
let pulls = 0;
+
let closed = false;
+
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
sink(pulls < 3 ? push(pulls++) : SignalKind.End);
+
} else {
+
closed = true;
+
}
+
})
+
);
+
};
+
+
const iterable = sinks.toAsyncIterable(source);
+
for await (const value of iterable) {
+
expect(value).toBe(0);
+
break;
+
}
+
+
expect(closed).toBe(true);
+
});
+
});
+
describe('toObservable', () => {
it('creates an Observable mirroring the Wonka source', () => {
const next = vi.fn();
+46
src/sinks.ts
···
})(source);
}
+
const doneResult = { done: true } as IteratorReturnResult<void>;
+
+
export const toAsyncIterable = <T>(source: Source<T>): AsyncIterable<T> => ({
+
[Symbol.asyncIterator](): AsyncIterator<T> {
+
const buffer: T[] = [];
+
+
let ended = false;
+
let talkback = talkbackPlaceholder;
+
let next: ((value: IteratorResult<T>) => void) | void;
+
+
source(signal => {
+
if (ended) {
+
/*noop*/
+
} else if (signal === SignalKind.End) {
+
if (next) next = next(doneResult);
+
ended = true;
+
} else if (signal.tag === SignalKind.Start) {
+
(talkback = signal[0])(TalkbackKind.Pull);
+
} else if (next) {
+
next = next({ value: signal[0], done: false });
+
} else {
+
buffer.push(signal[0]);
+
}
+
});
+
+
return {
+
async next(): Promise<IteratorResult<T>> {
+
if (ended && !buffer.length) {
+
return doneResult;
+
} else if (!ended && buffer.length <= 1) {
+
talkback(TalkbackKind.Pull);
+
}
+
+
return buffer.length
+
? { value: buffer.shift()!, done: false }
+
: new Promise(resolve => (next = resolve));
+
},
+
async return(): Promise<IteratorReturnResult<void>> {
+
if (!ended) next = talkback(TalkbackKind.Close);
+
ended = true;
+
return doneResult;
+
},
+
};
+
},
+
});
+
export function toArray<T>(source: Source<T>): T[] {
const values: T[] = [];
let talkback = talkbackPlaceholder;