Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Apply eslint/prettier fixes

+2 -2
gatsby-config.js
···
title: 'Wonka',
description: 'A fast push & pull stream library for Reason, OCaml, and TypeScript',
siteUrl: 'https://wonka.kitten.sh',
-
githubUrl: 'https://github.com/kitten/wonka'
+
githubUrl: 'https://github.com/kitten/wonka',
},
plugins: ['gatsby-plugin-netlify'],
-
__experimentalThemes: ['gatsby-theme-docs-system']
+
__experimentalThemes: ['gatsby-theme-docs-system'],
};
+4 -4
gatsby/theme.js
···
'Apple Color Emoji',
'Segoe UI Emoji',
'Segoe UI Symbol',
-
'Noto Color Emoji'
+
'Noto Color Emoji',
];
export const fonts = {
header: ['phantom-sans', ...systemFonts],
code: ['space-mono', 'monospace'],
-
body: systemFonts
+
body: systemFonts,
};
export const colors = {
···
fg: '#36313d',
fgHeading: '#000000',
fgPassive: '#78757a',
-
fgActive: '#f5735f'
+
fgActive: '#f5735f',
};
export const prismTheme = nightOwlLight;
···
export const borders = [
`${borderWidths[0]} solid ${colors.bgPassive}`,
`${borderWidths[1]} solid ${colors.bgPassive}`,
-
`${borderWidths[2]} solid ${colors.bgPassive}`
+
`${borderWidths[2]} solid ${colors.bgPassive}`,
];
+1 -1
gatsby/typography.js
···
headerFontFamily: theme.fonts.header,
headerWeight: '500',
bodyFontFamily: theme.fonts.body,
-
bodyWeight: '400'
+
bodyWeight: '400',
});
export default typography;
+7 -13
rollup.config.js
···
typescript({
typescript: require('typescript'),
-
exclude: [
-
'src/**/*.test.ts',
-
'**/__tests__/*',
-
],
+
exclude: ['src/**/*.test.ts', '**/__tests__/*'],
compilerOptions: {
sourceMap: true,
noEmit: false,
···
templateString: false,
objectRestSpread: false,
},
-
exclude: 'node_modules/**'
+
exclude: 'node_modules/**',
}),
terser({
···
sequences: false,
loops: false,
conditionals: false,
-
join_vars: false
+
join_vars: false,
},
mangle: {
module: true,
···
output: {
beautify: true,
braces: true,
-
indent_level: 2
-
}
+
indent_level: 2,
+
},
}),
];
-
const output = (format) => {
+
const output = format => {
const extension = format === 'esm' ? '.mjs' : '.js';
return {
chunkFileNames: '[hash]' + extension,
···
tryCatchDeoptimization: false,
moduleSideEffects: false,
},
-
output: [
-
output('esm'),
-
output('cjs'),
-
],
+
output: [output('esm'), output('cjs')],
};
export default config;
+4 -6
scripts/eslint-preset.js
···
'sort-keys': 'off',
'no-console': ['error', { allow: ['warn', 'error'] }],
'prefer-arrow/prefer-arrow-functions': 'off',
+
'prefer-rest-params': 'off',
'prettier/prettier': [
'error',
···
'@typescript-eslint/no-misused-new': 'off',
'@typescript-eslint/no-explicit-any': 'off',
'@typescript-eslint/array-type': 'off',
-
'@typescript-eslint/no-unused-vars': [
-
'error',
-
{
-
argsIgnorePattern: '^_',
-
},
-
],
+
'@typescript-eslint/no-empty-function': 'off',
+
'@typescript-eslint/no-unused-vars': 'off',
+
'prefer-rest-params': 'off',
},
},
],
+2 -2
scripts/flow-typings-plugin.js
···
function flowTypings() {
return {
-
name: "flow-typings",
+
name: 'flow-typings',
async writeBundle() {
const cwd = process.cwd();
for (const file of glob('dist/types/**/*.d.ts')) {
···
const definition = flowdef.replace(/import/g, 'import type');
writeFileSync(newpath, '// @flow\n\n' + definition);
}
-
}
+
},
};
}
+122 -164
src/__tests__/operators.test.ts
···
A Pull will be sent from the sink upwards and should pass through
the operator until the source receives it, which then pushes a
value down. */
-
const passesPassivePull = (
-
operator: Operator<any, any>,
-
output: any = 0
-
) => {
+
const passesPassivePull = (operator: Operator<any, any>, output: any = 0) => {
it('responds to Pull talkback signals (spec)', () => {
let talkback: TalkbackFn | null = null;
let pushes = 0;
const values: any[] = [];
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
if (!pushes && signal === TalkbackKind.Pull) {
-
pushes++;
-
sink(push(0));
-
}
-
}));
+
sink(
+
start(signal => {
+
if (!pushes && signal === TalkbackKind.Pull) {
+
pushes++;
+
sink(push(0));
+
}
+
})
+
);
};
-
const sink: Sink<any> = (signal) => {
+
const sink: Sink<any> = signal => {
expect(signal).not.toBe(SignalKind.End);
if (signal === SignalKind.End) {
/*noop*/
···
A Push will be sent downwards from the source, through the
operator to the sink. Pull events should be let through from
the sink after every Push event. */
-
const passesActivePush = (
-
operator: Operator<any, any>,
-
result: any = 0
-
) => {
+
const passesActivePush = (operator: Operator<any, any>, result: any = 0) => {
it('responds to eager Push signals (spec)', () => {
const values: any[] = [];
let talkback: TalkbackFn | null = null;
let sink: Sink<any> | null = null;
let pulls = 0;
-
const source: Source<any> = (_sink) => {
-
(sink = _sink)(start((signal) => {
-
if (signal === TalkbackKind.Pull)
-
pulls++;
-
}));
+
const source: Source<any> = _sink => {
+
(sink = _sink)(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
};
-
operator(source)((signal) => {
+
operator(source)(signal => {
expect(signal).not.toBe(SignalKind.End);
if (signal === SignalKind.End) {
/*noop*/
···
let closing = 0;
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull && !closing) {
-
sink(push(0));
-
} else if (signal === TalkbackKind.Close) {
-
closing++;
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull && !closing) {
+
sink(push(0));
+
} else if (signal === TalkbackKind.Close) {
+
closing++;
+
}
+
})
+
);
};
const sink: Sink<any> = signal => {
···
A Push and End signal will be sent after the first Pull talkback
signal from the sink, which shouldn't lead to any extra Close or Pull
talkback signals. */
-
const passesSourceEnd = (
-
operator: Operator<any, any>,
-
result: any = 0
-
) => {
+
const passesSourceEnd = (operator: Operator<any, any>, result: any = 0) => {
it('passes on immediate Push then End signals from source (spec)', () => {
const signals: Signal<any>[] = [];
let talkback: TalkbackFn | null = null;
···
let ending = 0;
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
expect(signal).not.toBe(TalkbackKind.Close);
-
if (signal === TalkbackKind.Pull) {
-
pulls++;
-
if (pulls === 1) {
-
sink(push(0));
-
sink(SignalKind.End);
+
sink(
+
start(signal => {
+
expect(signal).not.toBe(TalkbackKind.Close);
+
if (signal === TalkbackKind.Pull) {
+
pulls++;
+
if (pulls === 1) {
+
sink(push(0));
+
sink(SignalKind.End);
+
}
}
-
}
-
}));
+
})
+
);
};
const sink: Sink<any> = signal => {
···
after the first pull in response to another.
This is similar to passesSourceEnd but more well behaved since
mergeMap/switchMap/concatMap are eager operators. */
-
const passesSourcePushThenEnd = (
-
operator: Operator<any, any>,
-
result: any = 0
-
) => {
+
const passesSourcePushThenEnd = (operator: Operator<any, any>, result: any = 0) => {
it('passes on End signals from source (spec)', () => {
const signals: Signal<any>[] = [];
let talkback: TalkbackFn | null = null;
···
let ending = 0;
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
expect(signal).not.toBe(TalkbackKind.Close);
-
if (signal === TalkbackKind.Pull) {
-
pulls++;
-
if (pulls <= 2) { sink(push(0)); }
-
else { sink(SignalKind.End); }
-
}
-
}));
+
sink(
+
start(signal => {
+
expect(signal).not.toBe(TalkbackKind.Close);
+
if (signal === TalkbackKind.Pull) {
+
pulls++;
+
if (pulls <= 2) {
+
sink(push(0));
+
} else {
+
sink(SignalKind.End);
+
}
+
}
+
})
+
);
};
const sink: Sink<any> = signal => {
···
jest.runAllTimers();
expect(ending).toBe(1);
expect(pulls).toBe(3);
-
expect(signals).toEqual([
-
push(result),
-
push(result),
-
SignalKind.End
-
]);
+
expect(signals).toEqual([push(result), push(result), SignalKind.End]);
});
};
···
const signals: Signal<any>[] = [];
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
pulls++;
-
sink(SignalKind.End);
-
sink(push(123));
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
pulls++;
+
sink(SignalKind.End);
+
sink(push(123));
+
}
+
})
+
);
};
const sink: Sink<any> = signal => {
···
const signals: Signal<any>[] = [];
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
sink(push(123));
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
sink(push(123));
+
}
+
})
+
);
};
const sink: Sink<any> = signal => {
···
let ending = 0;
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
// For some operator tests we do need to send a single value
-
if (signal === TalkbackKind.Pull) {
-
sink(push(null));
-
} else {
-
closing++;
-
}
-
}));
+
sink(
+
start(signal => {
+
// For some operator tests we do need to send a single value
+
if (signal === TalkbackKind.Pull) {
+
sink(push(null));
+
} else {
+
closing++;
+
}
+
})
+
);
};
const sink: Sink<any> = signal => {
···
});
};
-
const passesAsyncSequence = (
-
operator: Operator<any, any>,
-
result: any = 0
-
) => {
+
const passesAsyncSequence = (operator: Operator<any, any>, result: any = 0) => {
it('passes an async push with an async end (spec)', () => {
let hasPushed = false;
const signals: Signal<any>[] = [];
const source: Source<any> = sink => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull && !hasPushed) {
-
hasPushed = true;
-
setTimeout(() => sink(push(0)), 10);
-
setTimeout(() => sink(SignalKind.End), 20);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull && !hasPushed) {
+
hasPushed = true;
+
setTimeout(() => sink(push(0)), 10);
+
setTimeout(() => sink(SignalKind.End), 20);
+
}
+
})
+
);
};
const sink: Sink<any> = signal => {
···
expect(hasPushed).toBeTruthy();
jest.runAllTimers();
-
expect(signals).toEqual([
-
push(result),
-
SignalKind.End
-
]);
+
expect(signals).toEqual([push(result), SignalKind.End]);
});
};
···
describe('buffer', () => {
const valueThenNever: Source<any> = sink =>
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull)
-
sink(push(null));
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) sink(push(null));
+
})
+
);
const noop = operators.buffer(valueThenNever);
···
})(sources.fromValue(null))(fn);
expect(teardown).toHaveBeenCalled();
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(signals).toEqual([start(expect.any(Function))]);
});
// This asynchronous test for concatMap will behave differently than mergeMap & switchMap
···
);
jest.advanceTimersByTime(14);
-
expect(fn.mock.calls).toEqual([
-
[1],
-
[2],
-
]);
+
expect(fn.mock.calls).toEqual([[1], [2]]);
jest.runAllTimers();
-
expect(fn.mock.calls).toEqual([
-
[1],
-
[2],
-
[10],
-
[20],
-
]);
+
expect(fn.mock.calls).toEqual([[1], [2], [10], [20]]);
});
it('works for fully asynchronous sources', () => {
···
return sources.make(observer => {
setTimeout(() => observer.next(1));
return () => {};
-
})
+
});
})(sources.fromValue(null))
);
···
const values: any[] = [];
sinks.forEach(x => values.push(x))(
-
operators.concat([
-
sources.fromArray([1, 2]),
-
sources.fromArray([3, 4])
-
])
+
operators.concat([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
);
-
expect(values).toEqual([ 1, 2, 3, 4 ]);
+
expect(values).toEqual([1, 2, 3, 4]);
});
});
···
})(sources.fromValue(null))(fn);
expect(teardown).toHaveBeenCalled();
-
expect(values).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(values).toEqual([start(expect.any(Function))]);
});
// This asynchronous test for mergeMap will behave differently than concatMap & switchMap
···
);
jest.runAllTimers();
-
expect(fn.mock.calls).toEqual([
-
[1],
-
[10],
-
[2],
-
[20],
-
]);
+
expect(fn.mock.calls).toEqual([[1], [10], [2], [20]]);
});
it('emits synchronous values in order', () => {
const values: any[] = [];
sinks.forEach(x => values.push(x))(
-
operators.merge([
-
sources.fromArray([1, 2]),
-
sources.fromArray([3, 4])
-
])
+
operators.merge([sources.fromArray([1, 2]), sources.fromArray([3, 4])])
);
-
expect(values).toEqual([ 1, 2, 3, 4 ]);
+
expect(values).toEqual([1, 2, 3, 4]);
});
});
···
let sink: Sink<any>;
const fn = jest.fn();
-
const source: Source<any> = _sink => { sink = _sink; };
+
const source: Source<any> = _sink => {
+
sink = _sink;
+
};
sinks.forEach(() => {})(operators.onStart(fn)(source));
···
describe('sample', () => {
const valueThenNever: Source<any> = sink =>
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull)
-
sink(push(null));
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) sink(push(null));
+
})
+
);
const noop = operators.sample(valueThenNever);
···
})(sources.fromValue(null))(fn);
expect(teardown).toHaveBeenCalled();
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(signals).toEqual([start(expect.any(Function))]);
});
// This asynchronous test for switchMap will behave differently than concatMap & mergeMap
···
const fn = jest.fn();
sinks.forEach(fn)(
-
operators.switchMap((x: number) => (
+
operators.switchMap((x: number) =>
operators.take(2)(operators.map((y: number) => x * (y + 1))(sources.interval(5)))
-
))(source)
+
)(source)
);
jest.runAllTimers();
-
expect(fn.mock.calls).toEqual([
-
[1],
-
[10],
-
[20],
-
]);
+
expect(fn.mock.calls).toEqual([[1], [10], [20]]);
});
});
···
next(1);
expect(fn).toHaveBeenCalledTimes(3);
-
expect(fn.mock.calls).toEqual([
-
[start(expect.any(Function))],
-
[push(1)],
-
[SignalKind.End],
-
]);
+
expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
});
});
···
next(1);
expect(fn).toHaveBeenCalledTimes(2);
-
expect(fn.mock.calls).toEqual([
-
[start(expect.any(Function))],
-
[push(1)],
-
]);
+
expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)]]);
notify(null);
expect(fn).toHaveBeenCalledTimes(3);
···
next(1);
next(2);
-
expect(fn.mock.calls).toEqual([
-
[start(expect.any(Function))],
-
[push(1)],
-
[SignalKind.End],
-
]);
+
expect(fn.mock.calls).toEqual([[start(expect.any(Function))], [push(1)], [SignalKind.End]]);
});
});
···
expect(signals.length).toBe(0);
complete();
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(2),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(2), SignalKind.End]);
});
});
+121 -99
src/__tests__/sinks.test.ts
···
let pulls = 0;
const fn = jest.fn();
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
if (pulls < 3) {
-
pulls++;
-
sink(push(0));
-
} else {
-
sink(SignalKind.End);
-
expect(pulls).toBe(3);
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
if (pulls < 3) {
+
pulls++;
+
sink(push(0));
+
} else {
+
sink(SignalKind.End);
+
expect(pulls).toBe(3);
+
}
}
-
}
-
}));
+
})
+
);
};
sinks.subscribe(fn)(source);
···
let pulls = 0;
let closing = 0;
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
if (!pulls) {
-
pulls++;
-
sink(push(0));
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
if (!pulls) {
+
pulls++;
+
sink(push(0));
+
}
+
} else {
+
closing++;
}
-
} else {
-
closing++;
-
}
-
}));
+
})
+
);
};
const sub = sinks.subscribe(() => {})(source);
···
let pulls = 0;
let closing = 0;
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
pulls++;
-
sink(SignalKind.End);
-
} else {
-
closing++;
-
}
-
}));
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
pulls++;
+
sink(SignalKind.End);
+
} else {
+
closing++;
+
}
+
})
+
);
};
const sub = sinks.subscribe(() => {})(source);
···
it('ignores Push signals after the source has ended', () => {
const fn = jest.fn();
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
sink(SignalKind.End);
-
sink(push(0));
-
}
-
}));
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
sink(SignalKind.End);
+
sink(push(0));
+
}
+
})
+
);
};
sinks.subscribe(fn)(source);
···
it('ignores Push signals after cancellation', () => {
const fn = jest.fn();
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
sink(push(0));
-
}
-
}));
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
sink(push(0));
+
}
+
})
+
);
};
sinks.subscribe(fn)(source).unsubscribe();
···
describe('publish', () => {
it('sends Pull talkback signals every Push signal', () => {
let pulls = 0;
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
if (pulls < 3) {
-
pulls++;
-
sink(push(0));
-
} else {
-
sink(SignalKind.End);
-
expect(pulls).toBe(3);
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
if (pulls < 3) {
+
pulls++;
+
sink(push(0));
+
} else {
+
sink(SignalKind.End);
+
expect(pulls).toBe(3);
+
}
}
-
}
-
}));
+
})
+
);
};
sinks.publish(source);
···
describe('toArray', () => {
it('sends Pull talkback signals every Push signal', () => {
let pulls = 0;
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
if (pulls < 3) {
-
pulls++;
-
sink(push(0));
-
} else {
-
sink(SignalKind.End);
-
expect(pulls).toBe(3);
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
if (pulls < 3) {
+
pulls++;
+
sink(push(0));
+
} else {
+
sink(SignalKind.End);
+
expect(pulls).toBe(3);
+
}
}
-
}
-
}));
+
})
+
);
};
const array = sinks.toArray(source);
···
let pulls = 0;
let ending = 0;
-
const source: Source<any> = (sink) => {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
if (!pulls) {
-
pulls++;
-
sink(push(0));
+
const source: Source<any> = sink => {
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
if (!pulls) {
+
pulls++;
+
sink(push(0));
+
}
+
} else {
+
ending++;
}
-
} else {
-
ending++;
-
}
-
}));
+
})
+
);
};
const array = sinks.toArray(source);
···
let pulls = 0;
let sink: Sink<any> | null = null;
-
const source: Source<any> = (_sink) => {
+
const source: Source<any> = _sink => {
sink = _sink;
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull)
-
pulls++;
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
};
const fn = jest.fn();
···
const source: Source<any> = _sink => {
sink = _sink;
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull)
-
pulls++;
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
};
Observable.from(observable.toObservable(source) as any).subscribe({
···
it('forwards cancellations from the Observable as a talkback', () => {
let ending = 0;
const source: Source<T> = sink =>
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close)
-
ending++;
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) ending++;
+
})
+
);
const sub = Observable.from(observable.toObservable(source) as any).subscribe({});
···
const source: Source<any> = _sink => {
sink = _sink;
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull)
-
pulls++;
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) pulls++;
+
})
+
);
};
callbagIterate(fn)(callbag.toCallbag(source));
···
const fn = jest.fn();
const source: Source<any> = sink =>
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
sink(push(0));
-
} else {
-
ending++;
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
sink(push(0));
+
} else {
+
ending++;
+
}
+
})
+
);
callbagIterate(fn)(callbagTake(1)(callbag.toCallbag(source) as any));
+25 -98
src/__tests__/sources.test.ts
···
import callbagFromArray from 'callbag-from-iter';
import Observable from 'zen-observable';
-
const collectSignals = (
-
source: Source<any>,
-
onStart?: (talkbackCb: TalkbackFn) => void
-
) => {
+
const collectSignals = (source: Source<any>, onStart?: (talkbackCb: TalkbackFn) => void) => {
let talkback = talkbackPlaceholder;
const signals: Signal<any>[] = [];
source(signal => {
···
} else {
talkback(TalkbackKind.Pull);
}
-
})
+
});
return signals;
};
···
};
source(sink);
-
expect(signals).toEqual([
-
push(1),
-
push(2),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([push(1), push(2), SignalKind.End]);
});
};
···
});
describe('merge', () => {
-
const source = operators.merge<any>([
-
sources.fromValue(0),
-
sources.empty
-
]);
+
const source = operators.merge<any>([sources.fromValue(0), sources.empty]);
passesColdPull(source);
passesActiveClose(source);
it('correctly merges two sources where the second is empty', () => {
-
const source = operators.merge<any>([
-
sources.fromValue(0),
-
sources.empty
-
]);
+
const source = operators.merge<any>([sources.fromValue(0), sources.empty]);
-
expect(collectSignals(source)).toEqual([
-
start(expect.any(Function)),
-
push(0),
-
SignalKind.End,
-
]);
+
expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]);
});
it('correctly merges hot sources', () => {
···
const signals = collectSignals(source);
expect(onStart).toHaveBeenCalledTimes(2);
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
push(2),
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1), push(2)]);
});
it('correctly merges asynchronous sources', () => {
···
const onStart = jest.fn();
const source = operators.merge<any>([
operators.onStart(onStart)(sources.fromValue(-1)),
-
operators.onStart(onStart)(
-
operators.take(2)(sources.interval(50))
-
),
+
operators.onStart(onStart)(operators.take(2)(sources.interval(50))),
]);
const signals = collectSignals(source);
···
});
describe('concat', () => {
-
const source = operators.concat<any>([
-
sources.fromValue(0),
-
sources.empty
-
]);
+
const source = operators.concat<any>([sources.fromValue(0), sources.empty]);
passesColdPull(source);
passesActiveClose(source);
it('correctly concats two sources where the second is empty', () => {
-
const source = operators.concat<any>([
-
sources.fromValue(0),
-
sources.empty
-
]);
+
const source = operators.concat<any>([sources.fromValue(0), sources.empty]);
-
expect(collectSignals(source)).toEqual([
-
start(expect.any(Function)),
-
push(0),
-
SignalKind.End,
-
]);
+
expect(collectSignals(source)).toEqual([start(expect.any(Function)), push(0), SignalKind.End]);
});
});
···
expect(signals).toEqual([start(expect.any(Function))]);
jest.runAllTimers();
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
});
it('supports active cancellation', () => {
···
const { source, next, complete } = sources.makeSubject();
const signals = collectSignals(source);
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(signals).toEqual([start(expect.any(Function))]);
next(1);
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1)]);
complete();
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
});
it('ignores signals after complete has been called', () => {
···
const signals = collectSignals(source);
complete();
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]);
next(1);
complete();
···
describe('never', () => {
it('emits nothing and ends immediately', () => {
const signals = collectSignals(sources.never);
-
expect(signals).toEqual([start(expect.any(Function)) ]);
+
expect(signals).toEqual([start(expect.any(Function))]);
});
});
···
it('emits nothing and ends immediately', () => {
const signals = collectSignals(sources.empty);
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), SignalKind.End]);
});
});
···
const promise = Promise.resolve(1);
const signals = collectSignals(sources.fromPromise(promise));
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(signals).toEqual([start(expect.any(Function))]);
await promise;
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1), SignalKind.End]);
});
});
···
await new Promise(resolve => setTimeout(resolve));
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
push(2),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]);
});
it('supports cancellation on converted Observables', async () => {
···
await new Promise(resolve => setTimeout(resolve));
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(signals).toEqual([start(expect.any(Function))]);
});
});
···
const source = callbag.fromCallbag(callbagFromArray([1, 2]) as any);
const signals = collectSignals(source);
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
push(1),
-
push(2),
-
SignalKind.End,
-
]);
+
expect(signals).toEqual([start(expect.any(Function)), push(1), push(2), SignalKind.End]);
});
it('supports cancellation on converted Observables', () => {
···
talkback(TalkbackKind.Close);
});
-
expect(signals).toEqual([
-
start(expect.any(Function)),
-
]);
+
expect(signals).toEqual([start(expect.any(Function))]);
});
});
···
const sink: Sink<any> = signal => {
expect(signal).not.toBe(SignalKind.End);
-
if ((signal as any).tag === SignalKind.Start)
-
talkback = signal[0];
+
if ((signal as any).tag === SignalKind.Start) talkback = signal[0];
};
sources.fromDomEvent(element as any, 'click')(sink);
+13 -11
src/callbag.ts
···
-
import { Source, SignalKind, TalkbackKind } from './types'
-
import { push, start } from './helpers'
+
import { Source, SignalKind, TalkbackKind } from './types';
+
import { push, start } from './helpers';
interface Callbag<I, O> {
(t: 0, d: Callbag<O, I>): void;
···
}
export function fromCallbag<T>(callbag: Callbag<any, T>): Source<T> {
-
return (sink) => {
+
return sink => {
callbag(0, (signal: number, data: any) => {
if (signal === 0) {
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Pull) {
-
data(1);
-
} else {
-
data(2);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Pull) {
+
data(1);
+
} else {
+
data(2);
+
}
+
})
+
);
} else if (signal === 1) {
sink(push(data));
} else if (signal === 2) {
···
export function toCallbag<T>(source: Source<T>): Callbag<any, T> {
return (signal: number, sink: any) => {
if (signal === 0) {
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
sink(2);
} else if (signal.tag === SignalKind.Start) {
+9 -5
src/helpers.ts
···
-
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types'
+
import { TalkbackFn, TeardownFn, Start, Push, SignalKind } from './types';
-
export const talkbackPlaceholder: TalkbackFn = (_signal) => {/*noop*/};
-
export const teardownPlaceholder: TeardownFn = () => {/*noop*/};
+
export const talkbackPlaceholder: TalkbackFn = _signal => {
+
/*noop*/
+
};
+
export const teardownPlaceholder: TeardownFn = () => {
+
/*noop*/
+
};
export function start<T>(talkback: TalkbackFn): Start<T> {
const box: any = [talkback];
box.tag = SignalKind.Start;
-
return box
+
return box;
}
export function push<T>(value: T): Push<T> {
const box: any = [value];
box.tag = SignalKind.Push;
-
return box
+
return box;
}
+7 -7
src/index.ts
···
-
export * from './types'
-
export * from './sources'
-
export * from './operators'
-
export * from './sinks'
-
export * from './observable'
-
export * from './callbag'
-
export * from './pipe'
+
export * from './types';
+
export * from './sources';
+
export * from './operators';
+
export * from './sinks';
+
export * from './observable';
+
export * from './callbag';
+
export * from './pipe';
+16 -11
src/observable.ts
···
-
import { Source, SignalKind, TalkbackKind } from './types'
-
import { push, start, talkbackPlaceholder } from './helpers'
+
import { Source, SignalKind, TalkbackKind } from './types';
+
import { push, start, talkbackPlaceholder } from './helpers';
interface ObservableSubscription {
closed?: boolean;
···
subscribe(observer: ObservableObserver<T>): ObservableSubscription;
}
-
const observableSymbol: unique symbol = typeof Symbol === 'function'
-
? (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable'))
-
: '@@observable'
+
const observableSymbol: unique symbol =
+
typeof Symbol === 'function'
+
? (Symbol as any).observable || ((Symbol as any).observable = Symbol('observable'))
+
: '@@observable';
export function fromObservable<T>(input: Observable<T>): Source<T> {
const observable: Observable<T> = input[observableSymbol]
? (input as any)[observableSymbol]()
: input;
-
return (sink) => {
+
return sink => {
const subscription = observable.subscribe({
next(value: T) {
sink(push(value));
···
complete() {
sink(SignalKind.End);
},
-
error() {/*noop*/},
+
error() {
+
/*noop*/
+
},
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) subscription.unsubscribe();
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) subscription.unsubscribe();
+
})
+
);
};
}
···
subscribe(observer: ObservableObserver<T>) {
let talkback = talkbackPlaceholder;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
+266 -244
src/operators.ts
···
-
import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types'
-
import { push, start, talkbackPlaceholder } from './helpers'
-
import { fromArray } from './sources'
+
import { Source, Sink, Operator, SignalKind, TalkbackKind, TalkbackFn } from './types';
+
import { push, start, talkbackPlaceholder } from './helpers';
+
import { fromArray } from './sources';
const identity = <T>(x: T): T => x;
export function buffer<S, T>(notifier: Source<S>): Operator<T, T[]> {
-
return (source) => (sink) => {
+
return source => sink => {
let buffer: T[] = [];
let sourceTalkback = talkbackPlaceholder;
let notifierTalkback = talkbackPlaceholder;
let pulled = false;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
sourceTalkback = signal[0];
-
notifier((signal) => {
+
notifier(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
}
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
sourceTalkback(TalkbackKind.Close);
-
notifierTalkback(TalkbackKind.Close);
-
} else if (!ended && !pulled) {
-
pulled = true;
-
sourceTalkback(TalkbackKind.Pull);
-
notifierTalkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
sourceTalkback(TalkbackKind.Close);
+
notifierTalkback(TalkbackKind.Close);
+
} else if (!ended && !pulled) {
+
pulled = true;
+
sourceTalkback(TalkbackKind.Pull);
+
notifierTalkback(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function combine<A, B>(sourceA: Source<A>, sourceB: Source<B>): Source<[A, B]> {
-
return (sink) => {
+
return sink => {
let lastValA: A | void;
let lastValB: B | void;
let talkbackA = talkbackPlaceholder;
···
let gotSignal = false;
let gotEnd = false;
let ended = false;
-
sourceA((signal) => {
+
sourceA(signal => {
if (signal === SignalKind.End) {
if (!gotEnd) {
gotEnd = true;
···
sink(push([lastValA, lastValB] as [A, B]));
}
});
-
sourceB((signal) => {
+
sourceB(signal => {
if (signal === SignalKind.End) {
if (!gotEnd) {
gotEnd = true;
···
sink(push([lastValA, lastValB] as [A, B]));
}
});
-
sink(start((signal) => {
-
if (ended) {
-
/*noop*/
-
} else if (signal === TalkbackKind.Close) {
-
ended = true;
-
talkbackA(TalkbackKind.Close);
-
talkbackB(TalkbackKind.Close);
-
} else if (!gotSignal) {
-
gotSignal = true;
-
talkbackA(TalkbackKind.Pull);
-
talkbackB(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (ended) {
+
/*noop*/
+
} else if (signal === TalkbackKind.Close) {
+
ended = true;
+
talkbackA(TalkbackKind.Close);
+
talkbackB(TalkbackKind.Close);
+
} else if (!gotSignal) {
+
gotSignal = true;
+
talkbackA(TalkbackKind.Pull);
+
talkbackB(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function concatMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
-
return (source) => (sink) => {
+
return source => sink => {
const inputQueue: In[] = [];
let outerTalkback = talkbackPlaceholder;
let innerTalkback = talkbackPlaceholder;
···
let ended = false;
function applyInnerSource(innerSource: Source<Out>): void {
innerActive = true;
-
innerSource((signal) => {
+
innerSource(signal => {
if (signal === SignalKind.End) {
if (innerActive) {
innerActive = false;
if (inputQueue.length) {
-
applyInnerSource(map(inputQueue.shift()!))
+
applyInnerSource(map(inputQueue.shift()!));
} else if (ended) {
sink(SignalKind.End);
} else if (!outerPulled) {
···
}
});
}
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
ended = true;
-
if (!innerActive && !inputQueue.length)
-
sink(SignalKind.End);
+
if (!innerActive && !inputQueue.length) sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
outerTalkback = signal[0];
} else {
···
}
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
if (!ended) {
-
ended = true;
-
outerTalkback(TalkbackKind.Close);
-
}
-
if (innerActive) {
-
innerActive = false;
-
innerTalkback(TalkbackKind.Close);
-
}
-
} else {
-
if (!ended && !outerPulled) {
-
outerPulled = true;
-
outerTalkback(TalkbackKind.Pull);
-
}
-
if (innerActive && !innerPulled) {
-
innerPulled = true;
-
innerTalkback(TalkbackKind.Pull);
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
if (!ended) {
+
ended = true;
+
outerTalkback(TalkbackKind.Close);
+
}
+
if (innerActive) {
+
innerActive = false;
+
innerTalkback(TalkbackKind.Close);
+
}
+
} else {
+
if (!ended && !outerPulled) {
+
outerPulled = true;
+
outerTalkback(TalkbackKind.Pull);
+
}
+
if (innerActive && !innerPulled) {
+
innerPulled = true;
+
innerTalkback(TalkbackKind.Pull);
+
}
}
-
}
-
}));
+
})
+
);
};
}
···
}
export function filter<T>(predicate: (value: T) => boolean): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let talkback = talkbackPlaceholder;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
sink(signal);
} else if (signal.tag === SignalKind.Start) {
···
}
export function map<In, Out>(map: (value: In) => Out): Operator<In, Out> {
-
return (source) => (sink) => source((signal) => {
-
if (signal === SignalKind.End || signal.tag === SignalKind.Start) {
-
sink(signal);
-
} else {
-
sink(push(map(signal[0])));
-
}
-
});
+
return source => sink =>
+
source(signal => {
+
if (signal === SignalKind.End || signal.tag === SignalKind.Start) {
+
sink(signal);
+
} else {
+
sink(push(map(signal[0])));
+
}
+
});
}
export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
-
return (source) => (sink) => {
+
return source => sink => {
const innerTalkbacks: TalkbackFn[] = [];
let outerTalkback = talkbackPlaceholder;
let outerPulled = false;
let ended = false;
function applyInnerSource(innerSource: Source<Out>): void {
let talkback = talkbackPlaceholder;
-
innerSource((signal) => {
+
innerSource(signal => {
if (signal === SignalKind.End) {
if (innerTalkbacks.length) {
const index = innerTalkbacks.indexOf(talkback);
···
}
}
} else if (signal.tag === SignalKind.Start) {
-
innerTalkbacks.push(talkback = signal[0]);
+
innerTalkbacks.push((talkback = signal[0]));
talkback(TalkbackKind.Pull);
} else if (innerTalkbacks.length) {
sink(signal);
···
}
});
}
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
ended = true;
-
if (!innerTalkbacks.length)
-
sink(SignalKind.End);
+
if (!innerTalkbacks.length) sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
outerTalkback = signal[0];
} else {
···
}
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
if (!ended) {
-
ended = true;
-
outerTalkback(TalkbackKind.Close);
-
}
-
while (innerTalkbacks.length)
-
innerTalkbacks.pop()!(TalkbackKind.Close);
-
} else {
-
if (!ended && !outerPulled) {
-
outerPulled = true;
-
outerTalkback(TalkbackKind.Pull);
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
if (!ended) {
+
ended = true;
+
outerTalkback(TalkbackKind.Close);
+
}
+
while (innerTalkbacks.length) innerTalkbacks.pop()!(TalkbackKind.Close);
} else {
-
outerPulled = false;
+
if (!ended && !outerPulled) {
+
outerPulled = true;
+
outerTalkback(TalkbackKind.Pull);
+
} else {
+
outerPulled = false;
+
}
+
for (let i = 0; i < innerTalkbacks.length; i++) innerTalkbacks[i](TalkbackKind.Pull);
}
-
for (let i = 0; i < innerTalkbacks.length; i++)
-
innerTalkbacks[i](TalkbackKind.Pull);
-
}
-
}));
+
})
+
);
};
}
···
}
export function onEnd<T>(callback: () => void): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
callback();
} else if (signal.tag === SignalKind.Start) {
const talkback = signal[0];
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
ended = true;
-
talkback(TalkbackKind.Close);
-
callback();
-
} else {
-
talkback(signal);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
ended = true;
+
talkback(TalkbackKind.Close);
+
callback();
+
} else {
+
talkback(signal);
+
}
+
})
+
);
} else {
sink(signal);
}
···
}
export function onPush<T>(callback: (value: T) => void): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
const talkback = signal[0];
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) ended = true;
-
talkback(signal);
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) ended = true;
+
talkback(signal);
+
})
+
);
} else {
callback(signal[0]);
sink(signal);
···
}
export function onStart<T>(callback: () => void): Operator<T, T> {
-
return (source) => (sink) => source((signal) => {
-
if (signal === SignalKind.End) {
-
sink(SignalKind.End);
-
} else if (signal.tag === SignalKind.Start) {
-
sink(signal);
-
callback();
-
} else {
-
sink(signal);
-
}
-
});
+
return source => sink =>
+
source(signal => {
+
if (signal === SignalKind.End) {
+
sink(SignalKind.End);
+
} else if (signal.tag === SignalKind.Start) {
+
sink(signal);
+
callback();
+
} else {
+
sink(signal);
+
}
+
});
}
export function sample<S, T>(notifier: Source<S>): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
let notifierTalkback = talkbackPlaceholder;
let value: T | void;
let pulled = false;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
}
}
});
-
notifier((signal) => {
+
notifier(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
sink(signal);
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
sourceTalkback(TalkbackKind.Close);
-
notifierTalkback(TalkbackKind.Close);
-
} else if (!ended && !pulled) {
-
pulled = true;
-
sourceTalkback(TalkbackKind.Pull);
-
notifierTalkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
sourceTalkback(TalkbackKind.Close);
+
notifierTalkback(TalkbackKind.Close);
+
} else if (!ended && !pulled) {
+
pulled = true;
+
sourceTalkback(TalkbackKind.Pull);
+
notifierTalkback(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function scan<In, Out>(reducer: (acc: Out, value: In) => Out, seed: Out): Operator<In, Out> {
-
return (source) => (sink) => {
+
return source => sink => {
let acc = seed;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
sink(signal);
} else {
-
sink(push(acc = reducer(acc, signal[0])));
+
sink(push((acc = reducer(acc, signal[0]))));
}
});
};
···
const sinks: Sink<T>[] = [];
let talkback = talkbackPlaceholder;
let gotSignal = false;
-
return (sink) => {
+
return sink => {
sinks.push(sink);
if (sinks.length === 1) {
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
while (sinks.length) sinks.pop()!(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
···
}
});
}
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
const index = sinks.indexOf(sink);
-
if (index > -1) sinks.splice(index, 1);
-
if (!sinks.length) talkback(TalkbackKind.Close);
-
} else if (signal === TalkbackKind.Pull && !gotSignal) {
-
gotSignal = true;
-
talkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
const index = sinks.indexOf(sink);
+
if (index > -1) sinks.splice(index, 1);
+
if (!sinks.length) talkback(TalkbackKind.Close);
+
} else if (signal === TalkbackKind.Pull && !gotSignal) {
+
gotSignal = true;
+
talkback(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function skip<T>(wait: number): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let talkback = talkbackPlaceholder;
let rest = wait;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
···
}
export function skipUntil<S, T>(notifier: Source<S>): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
let notifierTalkback = talkbackPlaceholder;
let skip = true;
let pulled = false;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
sourceTalkback = signal[0];
-
notifier((signal) => {
+
notifier(signal => {
if (signal === SignalKind.End) {
if (skip) {
ended = true;
···
pulled = false;
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
sourceTalkback(TalkbackKind.Close);
-
if (skip) notifierTalkback(TalkbackKind.Close);
-
} else if (!ended && !pulled) {
-
pulled = true;
-
if (skip) notifierTalkback(TalkbackKind.Pull);
-
sourceTalkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
sourceTalkback(TalkbackKind.Close);
+
if (skip) notifierTalkback(TalkbackKind.Close);
+
} else if (!ended && !pulled) {
+
pulled = true;
+
if (skip) notifierTalkback(TalkbackKind.Pull);
+
sourceTalkback(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function skipWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let talkback = talkbackPlaceholder;
let skip = true;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
···
}
export function switchMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
-
return (source) => (sink) => {
+
return source => sink => {
let outerTalkback = talkbackPlaceholder;
let innerTalkback = talkbackPlaceholder;
let outerPulled = false;
···
let ended = false;
function applyInnerSource(innerSource: Source<Out>): void {
innerActive = true;
-
innerSource((signal) => {
+
innerSource(signal => {
if (!innerActive) {
/*noop*/
} else if (signal === SignalKind.End) {
···
}
});
}
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
ended = true;
-
if (!innerActive)
-
sink(SignalKind.End);
+
if (!innerActive) sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
outerTalkback = signal[0];
} else {
···
applyInnerSource(map(signal[0]));
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
if (!ended) {
-
ended = true;
-
outerTalkback(TalkbackKind.Close);
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
if (!ended) {
+
ended = true;
+
outerTalkback(TalkbackKind.Close);
+
}
+
if (innerActive) {
+
innerActive = false;
+
innerTalkback(TalkbackKind.Close);
+
}
+
} else {
+
if (!ended && !outerPulled) {
+
outerPulled = true;
+
outerTalkback(TalkbackKind.Pull);
+
}
+
if (innerActive && !innerPulled) {
+
innerPulled = true;
+
innerTalkback(TalkbackKind.Pull);
+
}
}
-
if (innerActive) {
-
innerActive = false;
-
innerTalkback(TalkbackKind.Close);
-
}
-
} else {
-
if (!ended && !outerPulled) {
-
outerPulled = true;
-
outerTalkback(TalkbackKind.Pull);
-
}
-
if (innerActive && !innerPulled) {
-
innerPulled = true;
-
innerTalkback(TalkbackKind.Pull);
-
}
-
}
-
}));
+
})
+
);
};
}
···
}
export function take<T>(max: number): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let talkback = talkbackPlaceholder;
let ended = false;
let taken = 0;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
sink(signal);
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
talkback(TalkbackKind.Close);
-
} else if (signal === TalkbackKind.Pull && !ended && taken < max) {
-
talkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
talkback(TalkbackKind.Close);
+
} else if (signal === TalkbackKind.Pull && !ended && taken < max) {
+
talkback(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function takeLast<T>(max: number): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
const queue: T[] = [];
let talkback = talkbackPlaceholder;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
fromArray(queue)(sink);
} else if (signal.tag === SignalKind.Start) {
···
}
export function takeUntil<S, T>(notifier: Source<S>): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let sourceTalkback = talkbackPlaceholder;
let notifierTalkback = talkbackPlaceholder;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
sourceTalkback = signal[0];
-
notifier((signal) => {
+
notifier(signal => {
if (signal === SignalKind.End) {
/*noop*/
} else if (signal.tag === SignalKind.Start) {
···
sink(signal);
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
sourceTalkback(TalkbackKind.Close);
-
notifierTalkback(TalkbackKind.Close);
-
} else if (!ended) {
-
sourceTalkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
sourceTalkback(TalkbackKind.Close);
+
notifierTalkback(TalkbackKind.Close);
+
} else if (!ended) {
+
sourceTalkback(TalkbackKind.Pull);
+
}
+
})
+
);
};
}
export function takeWhile<T>(predicate: (value: T) => boolean): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let talkback = talkbackPlaceholder;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
}
export function debounce<T>(timing: (value: T) => number): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let id: any | void;
let deferredEnded = false;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (ended) {
/*noop*/
} else if (signal === SignalKind.End) {
···
}
} else if (signal.tag === SignalKind.Start) {
const talkback = signal[0];
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
deferredEnded = false;
-
if (id) clearTimeout(id);
-
talkback(TalkbackKind.Close);
-
} else if (!ended) {
-
talkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
deferredEnded = false;
+
if (id) clearTimeout(id);
+
talkback(TalkbackKind.Close);
+
} else if (!ended) {
+
talkback(TalkbackKind.Pull);
+
}
+
})
+
);
} else {
if (id) clearTimeout(id);
id = setTimeout(() => {
···
}
export function delay<T>(wait: number): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let active = 0;
-
source((signal) => {
+
source(signal => {
if (typeof signal !== 'number' && signal.tag === SignalKind.Start) {
sink(signal);
} else {
···
}
export function throttle<T>(timing: (value: T) => number): Operator<T, T> {
-
return (source) => (sink) => {
+
return source => sink => {
let skip = false;
let id: any | void;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
if (id) clearTimeout(id);
sink(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
const talkback = signal[0];
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
if (id) clearTimeout(id);
-
talkback(TalkbackKind.Close);
-
} else {
-
talkback(TalkbackKind.Pull);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
if (id) clearTimeout(id);
+
talkback(TalkbackKind.Close);
+
} else {
+
talkback(TalkbackKind.Pull);
+
}
+
})
+
);
} else if (!skip) {
skip = true;
if (id) clearTimeout(id);
···
};
}
-
export {
-
mergeAll as flatten,
-
onPush as tap,
-
}
+
export { mergeAll as flatten, onPush as tap };
+1 -2
src/pipe.ts
···
function pipe() {
let x = arguments[0];
-
for (let i = 1, l = arguments.length; i < l; i++)
-
x = arguments[i](x);
+
for (let i = 1, l = arguments.length; i < l; i++) x = arguments[i](x);
return x;
}
+11 -9
src/sinks.ts
···
-
import { Source, Subscription, TalkbackKind, SignalKind } from './types'
-
import { talkbackPlaceholder } from './helpers'
+
import { Source, Subscription, TalkbackKind, SignalKind } from './types';
+
import { talkbackPlaceholder } from './helpers';
export function subscribe<T>(subscriber: (value: T) => void) {
return (source: Source<T>): Subscription => {
let talkback = talkbackPlaceholder;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
ended = true;
} else if (signal.tag === SignalKind.Start) {
···
talkback(TalkbackKind.Close);
}
},
-
}
-
}
+
};
+
};
}
export function forEach<T>(subscriber: (value: T) => void) {
···
}
export function publish<T>(source: Source<T>): void {
-
subscribe((_value) => {/*noop*/})(source);
+
subscribe(_value => {
+
/*noop*/
+
})(source);
}
export function toArray<T>(source: Source<T>): T[] {
const values: T[] = [];
let talkback = talkbackPlaceholder;
let ended = false;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
ended = true;
} else if (signal.tag === SignalKind.Start) {
···
return new Promise(resolve => {
let talkback = talkbackPlaceholder;
let value: T | void;
-
source((signal) => {
+
source(signal => {
if (signal === SignalKind.End) {
resolve(value!);
} else if (signal.tag === SignalKind.Start) {
···
talkback(TalkbackKind.Pull);
}
});
-
})
+
});
}
+84 -70
src/sources.ts
···
-
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types'
-
import { push, start, talkbackPlaceholder } from './helpers'
+
import { Source, Sink, SignalKind, TalkbackKind, Observer, Subject, TeardownFn } from './types';
+
import { push, start, talkbackPlaceholder } from './helpers';
export function fromArray<T>(array: T[]): Source<T> {
-
return (sink) => {
+
return sink => {
let ended = false;
let looping = false;
let pulled = false;
let current = 0;
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
ended = true;
-
} else if (looping) {
-
pulled = true;
-
} else {
-
pulled = true;
-
looping = true;
-
for (pulled = looping = true; pulled && !ended; current++) {
-
if (current < array.length) {
-
pulled = false;
-
sink(push(array[current]));
-
} else {
-
ended = true;
-
sink(SignalKind.End);
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
ended = true;
+
} else if (looping) {
+
pulled = true;
+
} else {
+
pulled = true;
+
looping = true;
+
for (pulled = looping = true; pulled && !ended; current++) {
+
if (current < array.length) {
+
pulled = false;
+
sink(push(array[current]));
+
} else {
+
ended = true;
+
sink(SignalKind.End);
+
}
}
+
looping = false;
}
-
looping = false;
-
}
-
}))
-
}
+
})
+
);
+
};
}
export function fromValue<T>(value: T): Source<T> {
-
return (sink) => {
+
return sink => {
let ended = false;
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
ended = true;
-
} else if (!ended) {
-
ended = true;
-
sink(push(value));
-
sink(SignalKind.End);
-
}
-
}))
-
}
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
ended = true;
+
} else if (!ended) {
+
ended = true;
+
sink(push(value));
+
sink(SignalKind.End);
+
}
+
})
+
);
+
};
}
export function make<T>(produce: (observer: Observer<T>) => TeardownFn): Source<T> {
-
return (sink) => {
+
return sink => {
let ended = false;
const teardown = produce({
next(value: T) {
···
}
},
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close && !ended) {
-
ended = true;
-
teardown();
-
}
-
}));
-
}
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close && !ended) {
+
ended = true;
+
teardown();
+
}
+
})
+
);
+
};
}
export function makeSubject<T>(): Subject<T> {
···
return {
source(sink: Sink<T>) {
sinks.push(sink);
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
const index = sinks.indexOf(sink);
-
if (index > -1) sinks.splice(index, 1);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
const index = sinks.indexOf(sink);
+
if (index > -1) sinks.splice(index, 1);
+
}
+
})
+
);
},
next(value: T) {
if (!ended) {
···
export const empty: Source<any> = (sink: Sink<any>): void => {
let ended = false;
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) {
-
ended = true;
-
} else if (!ended) {
-
ended = true;
-
sink(SignalKind.End);
-
}
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) {
+
ended = true;
+
} else if (!ended) {
+
ended = true;
+
sink(SignalKind.End);
+
}
+
})
+
);
};
export const never: Source<any> = (sink: Sink<any>): void => {
···
};
export function interval(ms: number): Source<number> {
-
return (sink) => {
+
return sink => {
let i = 0;
const id = setInterval(() => {
sink(push(i++));
}, ms);
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close)
-
clearInterval(id);
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) clearInterval(id);
+
})
+
);
};
}
export function fromDomEvent(element: HTMLElement, event: string): Source<Event> {
-
return (sink) => {
+
return sink => {
const handler = (payload: Event) => {
sink(push(payload));
};
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close)
-
element.removeEventListener(event, handler);
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) element.removeEventListener(event, handler);
+
})
+
);
element.addEventListener(event, handler);
};
}
export function fromPromise<T>(promise: Promise<T>): Source<T> {
-
return (sink) => {
+
return sink => {
let ended = false;
-
promise.then((value) => {
+
promise.then(value => {
if (!ended) {
sink(push(value));
sink(SignalKind.End);
}
});
-
sink(start((signal) => {
-
if (signal === TalkbackKind.Close) ended = true;
-
}));
+
sink(
+
start(signal => {
+
if (signal === TalkbackKind.Close) ended = true;
+
})
+
);
};
}
+3 -3
src/types.ts
···
}
export interface Tag<T> {
-
tag: T
+
tag: T;
}
-
export type Start<_T> = (Tag<SignalKind.Start> & [TalkbackFn])
-
export type Push<T> = (Tag<SignalKind.Push> & [T])
+
export type Start<_T> = Tag<SignalKind.Start> & [TalkbackFn];
+
export type Push<T> = Tag<SignalKind.Push> & [T];
export type Signal<T> = Start<T> | Push<T> | SignalKind.End;
export type Sink<T> = (signal: Signal<T>) => void;