Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Fix looping behaviour to be identical as to before

Changed files
+16 -12
src
+12 -8
src/operators.ts
···
export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
-
const innerTalkbacks: TalkbackFn[] = [];
let outerTalkback = talkbackPlaceholder;
let outerPulled = false;
let ended = false;
···
if (signal === SignalKind.End) {
if (innerTalkbacks.length) {
const index = innerTalkbacks.indexOf(talkback);
-
if (index > -1) innerTalkbacks.splice(index, 1);
if (!innerTalkbacks.length) {
if (ended) {
sink(SignalKind.End);
···
ended = true;
outerTalkback(TalkbackKind.Close);
}
-
while (innerTalkbacks.length) innerTalkbacks.shift()!(TalkbackKind.Close);
} else {
if (!ended && !outerPulled) {
outerPulled = true;
···
} else {
outerPulled = false;
}
-
for (let i = 0; i < innerTalkbacks.length; i++) innerTalkbacks[i](TalkbackKind.Pull);
}
})
);
···
}
export function share<T>(source: Source<T>): Source<T> {
-
const sinks: Sink<T>[] = [];
let talkback = talkbackPlaceholder;
let gotSignal = false;
return sink => {
···
if (sinks.length === 1) {
source(signal => {
if (signal === SignalKind.End) {
-
while (sinks.length) sinks.pop()!(SignalKind.End);
} else if (signal.tag === SignalKind.Start) {
talkback = signal[0];
} else {
gotSignal = false;
-
for (let i = 0; i < sinks.length; i++) sinks[i](signal);
}
});
}
···
start(signal => {
if (signal === TalkbackKind.Close) {
const index = sinks.indexOf(sink);
-
if (index > -1) sinks.splice(index, 1);
if (!sinks.length) talkback(TalkbackKind.Close);
} else if (!gotSignal) {
gotSignal = true;
···
export function mergeMap<In, Out>(map: (value: In) => Source<Out>): Operator<In, Out> {
return source => sink => {
+
let innerTalkbacks: TalkbackFn[] = [];
let outerTalkback = talkbackPlaceholder;
let outerPulled = false;
let ended = false;
···
if (signal === SignalKind.End) {
if (innerTalkbacks.length) {
const index = innerTalkbacks.indexOf(talkback);
+
if (index > -1) (innerTalkbacks = innerTalkbacks.slice()).splice(index, 1);
if (!innerTalkbacks.length) {
if (ended) {
sink(SignalKind.End);
···
ended = true;
outerTalkback(TalkbackKind.Close);
}
+
for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
+
a[i](TalkbackKind.Close);
+
innerTalkbacks.length = 0;
} else {
if (!ended && !outerPulled) {
outerPulled = true;
···
} else {
outerPulled = false;
}
+
for (let i = 0, a = innerTalkbacks, l = innerTalkbacks.length; i < l; i++)
+
a[i](TalkbackKind.Pull);
}
})
);
···
}
export function share<T>(source: Source<T>): Source<T> {
+
let sinks: Sink<T>[] = [];
let talkback = talkbackPlaceholder;
let gotSignal = false;
return sink => {
···
if (sinks.length === 1) {
source(signal => {
if (signal === SignalKind.End) {
+
for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
+
sinks.length = 0;
} else if (signal.tag === SignalKind.Start) {
talkback = signal[0];
} else {
gotSignal = false;
+
for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
}
});
}
···
start(signal => {
if (signal === TalkbackKind.Close) {
const index = sinks.indexOf(sink);
+
if (index > -1) (sinks = sinks.slice()).splice(index, 1);
if (!sinks.length) talkback(TalkbackKind.Close);
} else if (!gotSignal) {
gotSignal = true;
+4 -4
src/sources.ts
···
}
export function makeSubject<T>(): Subject<T> {
-
const sinks: Sink<T>[] = [];
let ended = false;
return {
source(sink: Sink<T>) {
···
start(signal => {
if (signal === TalkbackKind.Close) {
const index = sinks.indexOf(sink);
-
if (index > -1) sinks.splice(index, 1);
}
})
);
···
next(value: T) {
if (!ended) {
const signal = push(value);
-
for (let i = 0; i < sinks.length; i++) sinks[i](signal);
}
},
complete() {
if (!ended) {
ended = true;
-
for (let i = 0; i < sinks.length; i++) sinks[i](SignalKind.End);
}
},
};
···
}
export function makeSubject<T>(): Subject<T> {
+
let sinks: Sink<T>[] = [];
let ended = false;
return {
source(sink: Sink<T>) {
···
start(signal => {
if (signal === TalkbackKind.Close) {
const index = sinks.indexOf(sink);
+
if (index > -1) (sinks = sinks.slice()).splice(index, 1);
}
})
);
···
next(value: T) {
if (!ended) {
const signal = push(value);
+
for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](signal);
}
},
complete() {
if (!ended) {
ended = true;
+
for (let i = 0, a = sinks, l = sinks.length; i < l; i++) a[i](SignalKind.End);
}
},
};