Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

feat: Implement variadic zip and combine (#116)

* Reimplement combine as variadic function

* Implement zip & combine

* Fix up combine types

* Support objects in zip source

* Add additional combine test

* Move combine/zip tests to separate file

* Move size check in zip

* Fix up tuple mapping type

+1 -1
package.json
···
"rollup": "^2.77.3",
"rollup-plugin-terser": "^7.0.2",
"tslib": "^2.4.0",
-
"typescript": "^4.7.4",
+
"typescript": "^4.8.2",
"zen-observable": "^0.8.15"
}
}
+74
src/__tests__/combine.test.ts
···
+
import { Source } from '../types';
+
import { fromValue, makeSubject } from '../sources';
+
import { forEach } from '../sinks';
+
+
import {
+
passesPassivePull,
+
passesActivePush,
+
passesSinkClose,
+
passesSourceEnd,
+
passesSingleStart,
+
passesStrictEnd,
+
} from './compliance';
+
+
import { combine, zip } from '../combine';
+
+
beforeEach(() => {
+
jest.useFakeTimers();
+
});
+
+
describe('zip', () => {
+
const noop = (source: Source<any>) => zip([fromValue(0), source]);
+
+
passesPassivePull(noop, [0, 0]);
+
passesActivePush(noop, [0, 0]);
+
passesSinkClose(noop);
+
passesSourceEnd(noop, [0, 0]);
+
passesSingleStart(noop);
+
passesStrictEnd(noop);
+
+
it('emits the zipped values of two sources', () => {
+
const { source: sourceA, next: nextA } = makeSubject<number>();
+
const { source: sourceB, next: nextB } = makeSubject<number>();
+
const fn = jest.fn();
+
+
const combined = combine(sourceA, sourceB);
+
forEach(fn)(combined);
+
+
nextA(1);
+
expect(fn).not.toHaveBeenCalled();
+
nextB(2);
+
expect(fn).toHaveBeenCalledWith([1, 2]);
+
});
+
+
it('emits the zipped values of three sources', () => {
+
const { source: sourceA, next: nextA } = makeSubject<number>();
+
const { source: sourceB, next: nextB } = makeSubject<number>();
+
const { source: sourceC, next: nextC } = makeSubject<number>();
+
const fn = jest.fn();
+
+
const combined = zip([sourceA, sourceB, sourceC]);
+
forEach(fn)(combined);
+
+
nextA(1);
+
expect(fn).not.toHaveBeenCalled();
+
nextB(2);
+
expect(fn).not.toHaveBeenCalled();
+
nextC(3);
+
expect(fn).toHaveBeenCalledWith([1, 2, 3]);
+
});
+
+
it('emits the zipped values of a dictionary of two sources', () => {
+
const { source: sourceA, next: nextA } = makeSubject<number>();
+
const { source: sourceB, next: nextB } = makeSubject<number>();
+
const fn = jest.fn();
+
+
const combined = zip({ a: sourceA, b: sourceB });
+
forEach(fn)(combined);
+
+
nextA(1);
+
expect(fn).not.toHaveBeenCalled();
+
nextB(2);
+
expect(fn).toHaveBeenCalledWith({ a: 1, b: 2 });
+
});
+
});
-24
src/__tests__/operators.test.ts
···
jest.useFakeTimers();
});
-
describe('combine', () => {
-
const noop = (source: Source<any>) => operators.combine(sources.fromValue(0), source);
-
-
passesPassivePull(noop, [0, 0]);
-
passesActivePush(noop, [0, 0]);
-
passesSinkClose(noop);
-
passesSourceEnd(noop, [0, 0]);
-
passesSingleStart(noop);
-
passesStrictEnd(noop);
-
-
it('emits the zipped values of two sources', () => {
-
const { source: sourceA, next: nextA } = sources.makeSubject();
-
const { source: sourceB, next: nextB } = sources.makeSubject();
-
const fn = jest.fn();
-
-
sinks.forEach(fn)(operators.combine(sourceA, sourceB));
-
-
nextA(1);
-
expect(fn).not.toHaveBeenCalled();
-
nextB(2);
-
expect(fn).toHaveBeenCalledWith([1, 2]);
-
});
-
});
-
describe('buffer', () => {
const valueThenNever: Source<any> = sink =>
sink(
+82
src/combine.ts
···
+
import { Source, TypeOfSource, SignalKind, TalkbackKind, TalkbackFn } from './types';
+
import { push, start, talkbackPlaceholder } from './helpers';
+
+
type TypeOfSourceArray<T extends readonly [...any[]]> = T extends [infer Head, ...infer Tail]
+
? [TypeOfSource<Head>, ...TypeOfSourceArray<Tail>]
+
: [];
+
+
export function zip<Sources extends readonly [...Source<any>[]]>(
+
sources: [...Sources]
+
): Source<TypeOfSourceArray<Sources>>;
+
+
export function zip<Sources extends { [prop: string]: Source<any> }>(
+
sources: Sources
+
): Source<{ [Property in keyof Sources]: TypeOfSource<Sources[Property]> }>;
+
+
export function zip<T>(
+
sources: Source<T>[] | Record<string, Source<T>>
+
): Source<T[] | Record<string, T>> {
+
const size = Object.keys(sources).length;
+
return sink => {
+
const filled: Set<string | number> = new Set();
+
+
const talkbacks: TalkbackFn[] | Record<string, TalkbackFn | void> = Array.isArray(sources)
+
? new Array(size).fill(talkbackPlaceholder)
+
: {};
+
const buffer: T[] | Record<string, T> = Array.isArray(sources) ? new Array(size) : {};
+
+
let gotBuffer = false;
+
let gotSignal = false;
+
let ended = false;
+
let endCount = 0;
+
+
for (const key in sources) {
+
(sources[key] as Source<T>)(signal => {
+
if (signal === SignalKind.End) {
+
if (endCount >= size - 1) {
+
ended = true;
+
sink(SignalKind.End);
+
} else {
+
endCount++;
+
}
+
} else if (signal.tag === SignalKind.Start) {
+
talkbacks[key] = signal[0];
+
} else if (!ended) {
+
buffer[key] = signal[0];
+
filled.add(key);
+
if (!gotBuffer && filled.size < size) {
+
if (!gotSignal) {
+
for (const key in sources)
+
if (!filled.has(key)) (talkbacks[key] || talkbackPlaceholder)(TalkbackKind.Pull);
+
} else {
+
gotSignal = false;
+
}
+
} else {
+
gotBuffer = true;
+
gotSignal = false;
+
sink(push(Array.isArray(buffer) ? buffer.slice() : { ...buffer }));
+
}
+
}
+
});
+
}
+
sink(
+
start(signal => {
+
if (ended) {
+
/*noop*/
+
} else if (signal === TalkbackKind.Close) {
+
ended = true;
+
for (const key in talkbacks) talkbacks[key](TalkbackKind.Close);
+
} else if (!gotSignal) {
+
gotSignal = true;
+
for (const key in talkbacks) talkbacks[key](TalkbackKind.Pull);
+
}
+
})
+
);
+
};
+
}
+
+
export function combine<Sources extends Source<any>[]>(
+
...sources: Sources
+
): Source<TypeOfSourceArray<Sources>> {
+
return zip(sources);
+
}
+1
src/index.ts
···
export * from './sources';
export * from './operators';
export * from './sinks';
+
export * from './combine';
export * from './observable';
export * from './callbag';
export * from './pipe';
-73
src/operators.ts
···
};
}
-
export function combine<A, B>(sourceA: Source<A>, sourceB: Source<B>): Source<[A, B]> {
-
return sink => {
-
let lastValA: A | void;
-
let lastValB: B | void;
-
let talkbackA = talkbackPlaceholder;
-
let talkbackB = talkbackPlaceholder;
-
let gotSignal = false;
-
let gotEnd = false;
-
let ended = false;
-
sourceA(signal => {
-
if (signal === SignalKind.End) {
-
if (!gotEnd) {
-
gotEnd = true;
-
} else {
-
ended = true;
-
sink(SignalKind.End);
-
}
-
} else if (signal.tag === SignalKind.Start) {
-
talkbackA = signal[0];
-
} else if (lastValB === undefined) {
-
lastValA = signal[0];
-
if (!gotSignal) {
-
talkbackB(TalkbackKind.Pull);
-
} else {
-
gotSignal = false;
-
}
-
} else if (!ended) {
-
lastValA = signal[0];
-
gotSignal = false;
-
sink(push([lastValA, lastValB] as [A, B]));
-
}
-
});
-
sourceB(signal => {
-
if (signal === SignalKind.End) {
-
if (!gotEnd) {
-
gotEnd = true;
-
} else {
-
ended = true;
-
sink(SignalKind.End);
-
}
-
} else if (signal.tag === SignalKind.Start) {
-
talkbackB = signal[0];
-
} else if (lastValA === undefined) {
-
lastValB = signal[0];
-
if (!gotSignal) {
-
talkbackA(TalkbackKind.Pull);
-
} else {
-
gotSignal = false;
-
}
-
} else if (!ended) {
-
lastValB = signal[0];
-
gotSignal = false;
-
sink(push([lastValA, lastValB] as [A, B]));
-
}
-
});
-
sink(
-
start(signal => {
-
if (ended) {
-
/*noop*/
-
} else if (signal === TalkbackKind.Close) {
-
ended = true;
-
talkbackA(TalkbackKind.Close);
-
talkbackB(TalkbackKind.Close);
-
} else if (!gotSignal) {
-
gotSignal = true;
-
talkbackA(TalkbackKind.Pull);
-
talkbackB(TalkbackKind.Pull);
-
}
-
})
-
);
-
};
-
}
-
export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
const inputQueue: In[] = [];
+3
src/types.ts
···
/** An operator transforms a [Source] and returns a new [Source], potentially with different timings or output types. */
export type Operator<In, Out> = (a: Source<In>) => Source<Out>;
+
/** Extracts the type of a given Source */
+
export type TypeOfSource<T> = T extends Source<infer U> ? U : never;
+
export interface Subscription {
unsubscribe(): void;
}
+4 -4
yarn.lock
···
resolved "https://registry.yarnpkg.com/typescript-compiler/-/typescript-compiler-1.4.1-2.tgz#ba4f7db22d91534a1929d90009dce161eb72fd3f"
integrity sha512-EMopKmoAEJqA4XXRFGOb7eSBhmQMbBahW6P1Koayeatp0b4AW2q/bBqYWkpG7QVQc9HGQUiS4trx2ZHcnAaZUg==
-
typescript@^4.7.4:
-
version "4.7.4"
-
resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.7.4.tgz#1a88596d1cf47d59507a1bcdfb5b9dfe4d488235"
-
integrity sha512-C0WQT0gezHuw6AdY1M2jxUO83Rjf0HP7Sk1DtXj6j1EwkQNZrHAg2XPWlq62oqEhYvONq5pkC2Y9oPljWToLmQ==
+
typescript@^4.8.2:
+
version "4.8.2"
+
resolved "https://registry.yarnpkg.com/typescript/-/typescript-4.8.2.tgz#e3b33d5ccfb5914e4eeab6699cf208adee3fd790"
+
integrity sha512-C0I1UsrrDHo2fYI5oaCGbSejwX4ch+9Y5jTQELvovfmFkK3HHSZJB8MSJcWLmCUBzQBchCrZ9rMRV6GuNrvGtw==
typescript@~4.4.4:
version "4.4.4"