import * as deriving from './helpers/wonka_deriving'; import * as sources from './wonka_sources.gen'; import * as sinks from './wonka_sinks.gen'; import * as operators from './wonka_operators.gen'; import * as web from './web/wonkaJs.gen'; import * as types from './wonka_types.gen'; /* This tests a noop operator for passive Pull talkback signals. A Pull will be sent from the sink upwards and should pass through the operator until the source receives it, which then pushes a value down. */ const passesPassivePull = ( operator: types.operatorT, output: any = 0 ) => it('responds to Pull talkback signals (spec)', () => { let talkback = null; let push = 0; const values = []; const source: types.sourceT = sink => { sink(deriving.start(tb => { if (!push && tb === deriving.pull) { push++; sink(deriving.push(0)); } })); }; const sink: types.sinkT = signal => { expect(deriving.isEnd(signal)).toBeFalsy(); if (deriving.isPush(signal)) { values.push(deriving.unboxPush(signal)); } else if (deriving.isStart(signal)) { talkback = deriving.unboxStart(signal); } }; operator(source)(sink); // The Start signal should always come in immediately expect(talkback).not.toBe(null); // No Push signals should be issued initially expect(values).toEqual([]); // When pulling a value we expect an immediate response talkback(deriving.pull); jest.runAllTimers(); expect(values).toEqual([output]); }); /* This tests a noop operator for regular, active Push signals. A Push will be sent downwards from the source, through the operator to the sink. Pull events should be let through from the sink after every Push event. */ const passesActivePush = ( operator: types.operatorT, result: any = 0 ) => it('responds to eager Push signals (spec)', () => { const values = []; let talkback = null; let push = null; let pulls = 0; const source: types.sourceT = sink => { push = (value: any) => sink(deriving.push(value)); sink(deriving.start(tb => { if (tb === deriving.pull) pulls++; })); }; const sink: types.sinkT = signal => { expect(deriving.isEnd(signal)).toBeFalsy(); if (deriving.isStart(signal)) { talkback = deriving.unboxStart(signal); } else if (deriving.isPush(signal)) { values.push(deriving.unboxPush(signal)); talkback(deriving.pull); } }; operator(source)(sink); // No Pull signals should be issued initially expect(pulls).toBe(0); // When pushing a value we expect an immediate response push(0); jest.runAllTimers(); expect(values).toEqual([result]); // Subsequently the Pull signal should have travelled upwards expect(pulls).toBe(1); }); /* This tests a noop operator for Close talkback signals from the sink. A Close signal will be sent, which should be forwarded to the source, which then ends the communication without sending an End signal. */ const passesSinkClose = (operator: types.operatorT) => it('responds to Close signals from sink (spec)', () => { let talkback = null; let closing = 0; let pulls = 0; const source: types.sourceT = sink => { sink(deriving.start(tb => { if (tb === deriving.pull) { pulls++; if (!closing) sink(deriving.push(0)); } if (tb === deriving.close) { closing++; } })); }; const sink: types.sinkT = signal => { expect(deriving.isEnd(signal)).toBeFalsy(); if (deriving.isStart(signal)) { talkback = deriving.unboxStart(signal); } else if (deriving.isPush(signal)) { talkback(deriving.close); } }; operator(source)(sink); // When pushing a value we expect an immediate close signal talkback(deriving.pull); jest.runAllTimers(); expect(closing).toBe(1); expect(pulls).toBe(1); }); /* This tests a noop operator for End signals from the source. A Push and End signal will be sent after the first Pull talkback signal from the sink, which shouldn't lead to any extra Close or Pull talkback signals. */ const passesSourceEnd = ( operator: types.operatorT, result: any = 0 ) => it('passes on End signals from source (spec)', () => { const signals = []; let talkback = null; let pulls = 0; let ending = 0; const source: types.sourceT = sink => { sink(deriving.start(tb => { expect(tb).not.toBe(deriving.close); if (tb === deriving.pull) pulls++; if (pulls === 1) { sink(deriving.push(0)); sink(deriving.end()); } })); }; const sink: types.sinkT = signal => { if (deriving.isStart(signal)) { talkback = deriving.unboxStart(signal); } else { signals.push(signal); if (deriving.isEnd(signal)) ending++; } }; operator(source)(sink); // When pushing a value we expect an immediate Push then End signal talkback(deriving.pull); jest.runAllTimers(); expect(pulls).toBe(1); expect(ending).toBe(1); expect(signals).toEqual([deriving.push(result), deriving.end()]); }); /* This tests a noop operator for Start signals from the source. When the operator's sink is started by the source it'll receive a Start event. As a response it should never send more than one Start signals to the sink. */ const passesSingleStart = (operator: types.operatorT) => it('sends a single Start event to the incoming sink (spec)', () => { let start = 0; const source: types.sourceT = sink => { sink(deriving.start(() => {})); }; const sink: types.sinkT = signal => { if (deriving.isStart(signal)) start++; }; // When starting the operator we expect a single start event on the sink operator(source)(sink); expect(start).toBe(1); }); /* This tests a noop operator for silence after End signals from the source. When the operator receives the End signal it shouldn't forward any other signals to the sink anymore. This isn't a strict requirement, but some operators should ensure that all sources are well behaved. This is particularly true for operators that either Close sources themselves or may operate on multiple sources. */ const passesStrictEnd = (operator: types.operatorT) => { it('stops all signals after End has been received (spec: strict end)', () => { let pulls = 0; const signals = []; const source: types.sourceT = sink => { sink(deriving.start(tb => { if (tb === deriving.pull) { pulls++; sink(deriving.end()); sink(deriving.push(123)); } })); }; const sink: types.sinkT = signal => { if (deriving.isStart(signal)) { deriving.unboxStart(signal)(deriving.pull); } else { signals.push(signal); } }; operator(source)(sink); // The Push signal should've been dropped jest.runAllTimers(); expect(signals).toEqual([deriving.end()]); expect(pulls).toBe(1); }); it('stops all signals after Close has been received (spec: strict close)', () => { const signals = []; const source: types.sourceT = sink => { sink(deriving.start(tb => { if (tb === deriving.close) { sink(deriving.push(123)); } })); }; const sink: types.sinkT = signal => { if (deriving.isStart(signal)) { deriving.unboxStart(signal)(deriving.close); } else { signals.push(signal); } }; operator(source)(sink); // The Push signal should've been dropped jest.runAllTimers(); expect(signals).toEqual([]); }); }; /* This tests an immediately closing operator for End signals to the sink and Close signals to the source. When an operator closes immediately we expect to see a Close signal at the source and an End signal to the sink, since the closing operator is expected to end the entire chain. */ const passesCloseAndEnd = (closingOperator: types.operatorT) => it('closes the source and ends the sink correctly (spec: ending operator)', () => { let closing = 0; let ending = 0; const source: types.sourceT = sink => { sink(deriving.start(tb => { // For some operator tests we do need to send a single value if (tb === deriving.pull) sink(deriving.push(null)); if (tb === deriving.close) closing++; })); }; const sink: types.sinkT = signal => { if (deriving.isStart(signal)) { deriving.unboxStart(signal)(deriving.pull); } if (deriving.isEnd(signal)) { ending++; } }; // We expect the operator to immediately end and close closingOperator(source)(sink); expect(closing).toBe(1); expect(ending).toBe(1); }); const passesAsyncSequence = ( operator: types.operatorT, result: any = 0 ) => it('passes an async push with an async end (spec)', () => { let hasPushed = false; const signals = []; const source: types.sourceT = sink => { sink(deriving.start(tb => { if (tb === deriving.pull && !hasPushed) { hasPushed = true; setTimeout(() => sink(deriving.push(0)), 10); setTimeout(() => sink(deriving.end()), 20); } })); }; const sink: types.sinkT = signal => { if (deriving.isStart(signal)) { setTimeout(() => { deriving.unboxStart(signal)(deriving.pull); }, 5); } else { signals.push(signal); } }; // We initially expect to see the push signal // Afterwards after all timers all other signals come in operator(source)(sink); expect(signals.length).toBe(0); jest.advanceTimersByTime(5); expect(hasPushed).toBeTruthy(); jest.runAllTimers(); expect(signals).toEqual([ deriving.push(result), deriving.end() ]); }); beforeEach(() => { jest.useFakeTimers(); }); describe('combine', () => { const noop = (source: types.sourceT) => operators.combine(sources.fromValue(0), source); passesPassivePull(noop, [0, 0]); passesActivePush(noop, [0, 0]); passesSinkClose(noop); passesSourceEnd(noop, [0, 0]); passesSingleStart(noop); passesStrictEnd(noop); it('emits the zipped values of two sources', () => { const { source: sourceA, next: nextA } = sources.makeSubject(); const { source: sourceB, next: nextB } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.combine(sourceA, sourceB)); nextA(1); expect(fn).not.toHaveBeenCalled(); nextB(2); expect(fn).toHaveBeenCalledWith([1, 2]); }); }); describe('buffer', () => { const valueThenNever: types.sourceT = sink => sink(deriving.start(tb => { if (tb === deriving.pull) sink(deriving.push(null)); })); const noop = operators.buffer(valueThenNever); passesPassivePull(noop, [0]); passesActivePush(noop, [0]); passesSinkClose(noop); passesSourceEnd(noop, [0]); passesSingleStart(noop); passesStrictEnd(noop); it('emits batches of input values when a notifier emits', () => { const { source: notifier$, next: notify } = sources.makeSubject(); const { source: input$, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.buffer(notifier$)(input$)); next(1); next(2); expect(fn).not.toHaveBeenCalled(); notify(null); expect(fn).toHaveBeenCalledWith([1, 2]); next(3); notify(null); expect(fn).toHaveBeenCalledWith([3]); }); }); describe('concatMap', () => { const noop = operators.concatMap(x => sources.fromValue(x)); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); // This synchronous test for concatMap will behave the same as mergeMap & switchMap it('emits values from each flattened synchronous source', () => { const { source, next, complete } = sources.makeSubject(); const fn = jest.fn(); operators.concatMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); next(1); next(3); complete(); expect(fn).toHaveBeenCalledTimes(6); expect(fn.mock.calls).toEqual([ [deriving.start(expect.any(Function))], [deriving.push(1)], [deriving.push(2)], [deriving.push(3)], [deriving.push(4)], [deriving.end()], ]); }); // This asynchronous test for concatMap will behave differently than mergeMap & switchMap it('emits values from each flattened asynchronous source, one at a time', () => { const source = web.delay(4)(sources.fromArray([1, 10])); const fn = jest.fn(); sinks.forEach(fn)( operators.concatMap((x: number) => { return web.delay(5)(sources.fromArray([x, x * 2])); })(source) ); jest.advanceTimersByTime(14); expect(fn.mock.calls).toEqual([ [1], [2], ]); jest.runAllTimers(); expect(fn.mock.calls).toEqual([ [1], [2], [10], [20], ]); }); it('emits synchronous values in order', () => { const values = []; sinks.forEach(x => values.push(x))( operators.concat([ sources.fromArray([1, 2]), sources.fromArray([3, 4]) ]) ); expect(values).toEqual([ 1, 2, 3, 4 ]); }); }); describe('debounce', () => { const noop = web.debounce(() => 0); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); it('waits for a specified amount of silence before emitting the last value', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(web.debounce(() => 100)(source)); next(1); jest.advanceTimersByTime(50); expect(fn).not.toHaveBeenCalled(); next(2); jest.advanceTimersByTime(99); expect(fn).not.toHaveBeenCalled(); jest.advanceTimersByTime(1); expect(fn).toHaveBeenCalledWith(2); }); it('emits debounced value with delayed End signal', () => { const { source, next, complete } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(web.debounce(() => 100)(source)); next(1); complete(); jest.advanceTimersByTime(100); expect(fn).toHaveBeenCalled(); }); }); describe('delay', () => { const noop = web.delay(0); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('delays outputs by a specified delay timeout value', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(web.delay(100)(source)); next(1); expect(fn).not.toHaveBeenCalled(); jest.advanceTimersByTime(100); expect(fn).toHaveBeenCalledWith(1); }); }); describe('filter', () => { const noop = operators.filter(() => true); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('prevents emissions for which a predicate fails', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.filter(x => !!x)(source)); next(false); expect(fn).not.toHaveBeenCalled(); next(true); expect(fn).toHaveBeenCalledWith(true); }); }); describe('map', () => { const noop = operators.map(x => x); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('maps over values given a transform function', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.map((x: number) => x + 1)(source)); next(1); expect(fn).toHaveBeenCalledWith(2); }); }); describe('mergeMap', () => { const noop = operators.mergeMap(x => sources.fromValue(x)); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); // This synchronous test for mergeMap will behave the same as concatMap & switchMap it('emits values from each flattened synchronous source', () => { const { source, next, complete } = sources.makeSubject(); const fn = jest.fn(); operators.mergeMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); next(1); next(3); complete(); expect(fn).toHaveBeenCalledTimes(6); expect(fn.mock.calls).toEqual([ [deriving.start(expect.any(Function))], [deriving.push(1)], [deriving.push(2)], [deriving.push(3)], [deriving.push(4)], [deriving.end()], ]); }); // This asynchronous test for mergeMap will behave differently than concatMap & switchMap it('emits values from each flattened asynchronous source simultaneously', () => { const source = web.delay(4)(sources.fromArray([1, 10])); const fn = jest.fn(); sinks.forEach(fn)( operators.mergeMap((x: number) => { return web.delay(5)(sources.fromArray([x, x * 2])); })(source) ); jest.runAllTimers(); expect(fn.mock.calls).toEqual([ [1], [2], [10], [20], ]); }); it('emits synchronous values in order', () => { const values = []; sinks.forEach(x => values.push(x))( operators.merge([ sources.fromArray([1, 2]), sources.fromArray([3, 4]) ]) ); expect(values).toEqual([ 1, 2, 3, 4 ]); }); }); describe('onEnd', () => { const noop = operators.onEnd(() => {}); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('calls a callback when the source ends', () => { const { source, next, complete } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(() => {})(operators.onEnd(fn)(source)); next(null); expect(fn).not.toHaveBeenCalled(); complete(); expect(fn).toHaveBeenCalled(); }); }); describe('onPush', () => { const noop = operators.onPush(() => {}); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('calls a callback when the source emits', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(() => {})(operators.onPush(fn)(source)); next(1); expect(fn).toHaveBeenCalledWith(1); next(2); expect(fn).toHaveBeenCalledWith(2); }); it('is the same as `tap`', () => { expect(operators.onPush).toBe(operators.tap); }); }); describe('onStart', () => { const noop = operators.onStart(() => {}); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('is called when the source starts', () => { let sink: types.sinkT; const fn = jest.fn(); const source: types.sourceT = _sink => { sink = _sink; }; sinks.forEach(() => {})(operators.onStart(fn)(source)); expect(fn).not.toHaveBeenCalled(); sink(deriving.start(() => {})); expect(fn).toHaveBeenCalled(); }); }); describe('sample', () => { const valueThenNever: types.sourceT = sink => sink(deriving.start(tb => { if (tb === deriving.pull) sink(deriving.push(null)); })); const noop = operators.sample(valueThenNever); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); it('emits the latest value when a notifier source emits', () => { const { source: notifier$, next: notify } = sources.makeSubject(); const { source: input$, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.sample(notifier$)(input$)); next(1); next(2); expect(fn).not.toHaveBeenCalled(); notify(null); expect(fn).toHaveBeenCalledWith(2); }); }); describe('scan', () => { const noop = operators.scan((_acc, x) => x, null); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('folds values continuously with a reducer and initial value', () => { const { source: input$, next } = sources.makeSubject(); const fn = jest.fn(); const reducer = (acc: number, x: number) => acc + x; sinks.forEach(fn)(operators.scan(reducer, 0)(input$)); next(1); expect(fn).toHaveBeenCalledWith(1); next(2); expect(fn).toHaveBeenCalledWith(3); }); }); describe('share', () => { const noop = operators.share; passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); it('shares output values between sinks', () => { let push = () => {}; const source: types.sourceT = operators.share(sink => { sink(deriving.start(() => {})); push = () => { sink(deriving.push([0])); sink(deriving.end()); }; }); const fnA = jest.fn(); const fnB = jest.fn(); sinks.forEach(fnA)(source); sinks.forEach(fnB)(source); push(); expect(fnA).toHaveBeenCalledWith([0]); expect(fnB).toHaveBeenCalledWith([0]); expect(fnA.mock.calls[0][0]).toBe(fnB.mock.calls[0][0]); }); }); describe('skip', () => { const noop = operators.skip(0); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('skips a number of values before emitting normally', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.skip(1)(source)); next(1); expect(fn).not.toHaveBeenCalled(); next(2); expect(fn).toHaveBeenCalledWith(2); }); }); describe('skipUntil', () => { const noop = operators.skipUntil(sources.fromValue(null)); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); passesStrictEnd(noop); it('skips values until the notifier source emits', () => { const { source: notifier$, next: notify } = sources.makeSubject(); const { source: input$, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.skipUntil(notifier$)(input$)); next(1); expect(fn).not.toHaveBeenCalled(); notify(null); next(2); expect(fn).toHaveBeenCalledWith(2); }); }); describe('skipWhile', () => { const noop = operators.skipWhile(() => false); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('skips values until one fails a predicate', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(operators.skipWhile(x => x <= 1)(source)); next(1); expect(fn).not.toHaveBeenCalled(); next(2); expect(fn).toHaveBeenCalledWith(2); }); }); describe('switchMap', () => { const noop = operators.switchMap(x => sources.fromValue(x)); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); // This synchronous test for switchMap will behave the same as concatMap & mergeMap it('emits values from each flattened synchronous source', () => { const { source, next, complete } = sources.makeSubject(); const fn = jest.fn(); operators.switchMap((x: number) => sources.fromArray([x, x + 1]))(source)(fn); next(1); next(3); complete(); expect(fn).toHaveBeenCalledTimes(6); expect(fn.mock.calls).toEqual([ [deriving.start(expect.any(Function))], [deriving.push(1)], [deriving.push(2)], [deriving.push(3)], [deriving.push(4)], [deriving.end()], ]); }); // This asynchronous test for switchMap will behave differently than concatMap & mergeMap it('emits values from each flattened asynchronous source, one at a time', () => { const source = web.delay(4)(sources.fromArray([1, 10])); const fn = jest.fn(); sinks.forEach(fn)( operators.switchMap((x: number) => ( operators.take(2)(operators.map((y: number) => x * (y + 1))(web.interval(5))) ))(source) ); jest.runAllTimers(); expect(fn.mock.calls).toEqual([ [1], [10], [20], ]); }); }); describe('take', () => { const noop = operators.take(10); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); passesCloseAndEnd(operators.take(0)); it('emits values until a maximum is reached', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); operators.take(1)(source)(fn); next(1); expect(fn).toHaveBeenCalledTimes(3); expect(fn.mock.calls).toEqual([ [deriving.start(expect.any(Function))], [deriving.push(1)], [deriving.end()], ]); }); }); describe('takeUntil', () => { const noop = operators.takeUntil(sources.never); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesStrictEnd(noop); passesAsyncSequence(noop); const ending = operators.takeUntil(sources.fromValue(null)); passesCloseAndEnd(ending); it('emits values until a notifier emits', () => { const { source: notifier$, next: notify } = sources.makeSubject(); const { source: input$, next } = sources.makeSubject(); const fn = jest.fn(); operators.takeUntil(notifier$)(input$)(fn); next(1); expect(fn).toHaveBeenCalledTimes(2); expect(fn.mock.calls).toEqual([ [deriving.start(expect.any(Function))], [deriving.push(1)], ]); notify(null); expect(fn).toHaveBeenCalledTimes(3); expect(fn.mock.calls[2][0]).toEqual(deriving.end()); }); }); describe('takeWhile', () => { const noop = operators.takeWhile(() => true); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); const ending = operators.takeWhile(() => false); passesCloseAndEnd(ending); it('emits values while a predicate passes for all values', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); operators.takeWhile(x => x < 2)(source)(fn); next(1); next(2); expect(fn.mock.calls).toEqual([ [deriving.start(expect.any(Function))], [deriving.push(1)], [deriving.end()], ]); }); }); describe('takeLast', () => { passesCloseAndEnd(operators.takeLast(0)); it('emits the last max values of an ended source', () => { const { source, next, complete } = sources.makeSubject(); const values = []; let talkback; operators.takeLast(1)(source)(signal => { values.push(signal); if (deriving.isStart(signal)) talkback = deriving.unboxStart(signal); if (!deriving.isEnd(signal)) talkback(deriving.pull); }); next(1); next(2); expect(values.length).toBe(0); complete(); expect(values).toEqual([ deriving.start(expect.any(Function)), deriving.push(2), deriving.end(), ]); }); }); describe('throttle', () => { const noop = web.throttle(() => 0); passesPassivePull(noop); passesActivePush(noop); passesSinkClose(noop); passesSourceEnd(noop); passesSingleStart(noop); passesAsyncSequence(noop); it('should ignore emissions for a period of time after a value', () => { const { source, next } = sources.makeSubject(); const fn = jest.fn(); sinks.forEach(fn)(web.throttle(() => 100)(source)); next(1); expect(fn).toHaveBeenCalledWith(1); jest.advanceTimersByTime(50); next(2); expect(fn).toHaveBeenCalledTimes(1); jest.advanceTimersByTime(50); next(3); expect(fn).toHaveBeenCalledWith(3); }); });