Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Return subscription type from subscribe

+1 -1
__tests__/wonka_test.re
···
sink(.Start(Wonka_helpers.talkbackPlaceholder));
};
-
let unsubscribe = Wonka.subscribe(x => ignore(Js.Array.push(x, nums)), source);
push^();
push^();
···
sink(.Start(Wonka_helpers.talkbackPlaceholder));
};
+
let { unsubscribe } = Wonka.subscribe(x => ignore(Js.Array.push(x, nums)), source);
push^();
push^();
+2 -1
src/index.d.ts
···
-
export { Talkback, Signal, Sink, Source, Operator, Observer, Subject } from './wonka_types';
export * from './pipe';
export * from './wonka';
export * from './web/wonkaJs';
···
+
export { Talkback, Signal, Sink, Subscription, Source, Operator, Observer, Subject } from './wonka_types';
+
export * from './pipe';
export * from './wonka';
export * from './web/wonkaJs';
+3 -3
src/wonka.d.ts
···
-
import { List, Sink, Source, Operator, Observer, Subject } from './wonka_types';
export const makeSubject: <A>() => Subject<A>;
···
export const takeWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
export const takeUntil: <A>(signal: Source<any>) => Operator<A, A>;
export const skip: <A>(max: number) => Operator<A, A>;
-
export const skipWhile: <A>(f: (value: A) => boolean) => Operator<A, A>;
export const skipUntil: <A>(signal: Source<any>) => Operator<A, A>;
export const forEach: <A>(f: (x: A) => void) => (source: Source<A>) => void;
-
export const subscribe: <A>(f: (x: A) => void) => (source: Source<A>) => (() => void);
···
+
import { List, Sink, Source, Subscription, Operator, Observer, Subject } from './wonka_types';
export const makeSubject: <A>() => Subject<A>;
···
export const takeWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
export const takeUntil: <A>(signal: Source<any>) => Operator<A, A>;
export const skip: <A>(max: number) => Operator<A, A>;
+
export const skipWhile: <A>(f: (x: A) => boolean) => Operator<A, A>;
export const skipUntil: <A>(signal: Source<any>) => Operator<A, A>;
export const forEach: <A>(f: (x: A) => void) => (source: Source<A>) => void;
+
export const subscribe: <A>(f: (x: A) => void) => (source: Source<A>) => Subscription;
+6 -3
src/wonka.re
···
}
});
-
() => if (!ended^) {
-
ended := true;
-
talkback^(.Close);
}
});
···
}
});
+
{
+
unsubscribe: () =>
+
if (!ended^) {
+
ended := true;
+
talkback^(.Close);
+
}
}
});
+1 -1
src/wonka.rei
···
/* Similar to the `forEach` sink factory, but returns an anonymous function
that when called will end the stream immediately.
Ending the stream will propagate an End signal upwards to the root source. */
-
let subscribe: ('a => unit, sourceT('a)) => (unit => unit);
···
/* Similar to the `forEach` sink factory, but returns an anonymous function
that when called will end the stream immediately.
Ending the stream will propagate an End signal upwards to the root source. */
+
let subscribe: ('a => unit, sourceT('a)) => subscriptionT;
+4 -9
src/wonka_types.d.ts
···
export interface Source<A> { (sink: Sink<A>): void; }
export interface Operator<A, B> { (source: Source<A>): Source<B>; }
-
export interface Observer<A> {
-
[0]: (value: A) => void,
-
[1]: () => void
-
}
-
export interface Subject<A> {
-
[0]: Source<A>,
-
[1]: (value: A) => void,
-
[2]: () => void
-
}
···
export interface Source<A> { (sink: Sink<A>): void; }
export interface Operator<A, B> { (source: Source<A>): Source<B>; }
+
export type Subscription = [() => void];
+
+
export type Observer<A> = [(value: A) => void, () => void];
+
export type Subject<A> = [Source<A>, (value: A) => void, () => void];
+4
src/wonka_types.re
···
type sinkT('a) = (.signalT('a)) => unit;
type sourceT('a) = sinkT('a) => unit;
type observerT('a) = {
next: 'a => unit,
complete: unit => unit
···
type sinkT('a) = (.signalT('a)) => unit;
type sourceT('a) = sinkT('a) => unit;
+
type subscriptionT = {
+
unsubscribe: unit => unit
+
};
+
type observerT('a) = {
next: 'a => unit,
complete: unit => unit