Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Add missing Close signal to takeUntil operator (#128)

Changed files
+53
src
+52
src/__tests__/operators.test.ts
···
expect(fnB).toHaveBeenCalledWith([0]);
expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
});
+
+
it('completes the source when no more sink is listening', () => {
+
let onPush = () => {};
+
+
const talkback = vi.fn();
+
const source: Source<any> = operators.share(sink => {
+
sink(start(talkback));
+
onPush = () => {
+
sink(push([0]));
+
sink(push([1]));
+
sink(SignalKind.End);
+
};
+
});
+
+
const fnA = vi.fn();
+
const fnB = vi.fn();
+
+
sinks.forEach(fnA)(operators.take(1)(source));
+
sinks.forEach(fnB)(operators.take(1)(source));
+
onPush();
+
+
expect(fnA).toHaveBeenCalledWith([0]);
+
expect(fnB).toHaveBeenCalledWith([0]);
+
expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]);
+
expect(talkback).toHaveBeenCalledWith(TalkbackKind.Close);
+
});
});
describe('skip', () => {
···
notify(null);
expect(fn).toHaveBeenCalledTimes(3);
expect(fn.mock.calls[2][0]).toEqual(SignalKind.End);
+
});
+
+
it('emits values until a notifier emits', () => {
+
const { source: input$, next } = sources.makeSubject<number>();
+
const fn = vi.fn();
+
+
let hasClosed = false;
+
+
operators.takeUntil(sink => {
+
sink(
+
start(talkback => {
+
if (talkback === TalkbackKind.Close) {
+
hasClosed = true;
+
} else if (talkback === TalkbackKind.Pull && !hasClosed) {
+
sink(push(1));
+
}
+
})
+
);
+
})(input$)(fn);
+
+
next(1);
+
+
expect(fn).toHaveBeenCalledTimes(2);
+
expect(fn.mock.calls).toEqual([[0], [start(expect.any(Function))]]);
+
+
expect(hasClosed).toBe(true);
});
});
+1
src/operators.ts
···
(notifierTalkback = signal[0])(TalkbackKind.Pull);
} else {
ended = true;
+
notifierTalkback(TalkbackKind.Close);
sourceTalkback(TalkbackKind.Close);
sink(SignalKind.End);
}