Mirror: 🎩 A tiny but capable push & pull stream library for TypeScript and Flow

Fix close talkback on {concat,merge,switch}Map

The Close event was not being respected and sent
to the inner talkback when the outer source has
already ended.

+7 -5
src/operators/wonka_operator_concatMap.re
···
sink(.Start((.signal) => {
switch (signal) {
| Pull => if (!state.ended) state.innerTalkback(.Pull)
-
| Close when !state.ended => {
-
state.ended = true;
-
state.closed = true;
-
state.outerTalkback(.Close);
+
| Close => {
state.innerTalkback(.Close);
+
if (!state.ended) {
+
state.ended = true;
+
state.closed = true;
+
state.outerTalkback(.Close);
+
state.innerTalkback = talkbackPlaceholder;
+
}
}
-
| Close => ()
}
}));
}));
+7 -5
src/operators/wonka_operator_mergeMap.re
···
sink(.Start((.signal) => {
switch (signal) {
-
| Close when !state.ended => {
-
state.ended = true;
-
state.outerTalkback(.Close);
+
| Close => {
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close));
-
state.innerTalkbacks = Rebel.Array.makeEmpty();
+
if (!state.ended) {
+
state.ended = true;
+
state.outerTalkback(.Close);
+
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Close));
+
state.innerTalkbacks = Rebel.Array.makeEmpty();
+
}
}
-
| Close => ()
| Pull when !state.ended =>
Rebel.Array.forEach(state.innerTalkbacks, talkback => talkback(.Pull));
| Pull => ()
+7 -6
src/operators/wonka_operator_switchMap.re
···
sink(.Start((.signal) => {
switch (signal) {
| Pull => state.innerTalkback(.Pull)
-
| Close when !state.ended => {
-
state.ended = true;
-
state.closed = true;
-
state.outerTalkback(.Close);
+
| Close => {
state.innerTalkback(.Close);
-
state.innerTalkback = talkbackPlaceholder;
+
if (!state.ended) {
+
state.ended = true;
+
state.closed = true;
+
state.outerTalkback(.Close);
+
state.innerTalkback = talkbackPlaceholder;
+
}
}
-
| Close => ()
}
}));
}));