Skip to content

Commit ee1a339

Browse files
committed
fix(exhaustMap): stop listening to a synchronous inner-obervable when unsubscribed
1 parent 260d52a commit ee1a339

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

spec/operators/exhaustMap-spec.ts

+27-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
2-
import { Observable, of, from } from 'rxjs';
3-
import { exhaustMap, mergeMap } from 'rxjs/operators';
2+
import { concat, defer, Observable, of, from } from 'rxjs';
3+
import { exhaustMap, mergeMap, takeWhile } from 'rxjs/operators';
44
import { expect } from 'chai';
55

66
declare function asDiagram(arg: string): Function;
@@ -202,6 +202,31 @@ describe('exhaustMap', () => {
202202
expectSubscriptions(e1.subscriptions).toBe(e1subs);
203203
});
204204

205+
it('should stop listening to a synchronous observable when unsubscribed', () => {
206+
const sideEffects: number[] = [];
207+
const synchronousObservable = concat(
208+
defer(() => {
209+
sideEffects.push(1);
210+
return of(1);
211+
}),
212+
defer(() => {
213+
sideEffects.push(2);
214+
return of(2);
215+
}),
216+
defer(() => {
217+
sideEffects.push(3);
218+
return of(3);
219+
})
220+
);
221+
222+
of(null).pipe(
223+
exhaustMap(() => synchronousObservable),
224+
takeWhile((x) => x != 2) // unsubscribe at the second side-effect
225+
).subscribe(() => { /* noop */ });
226+
227+
expect(sideEffects).to.deep.equal([1, 2]);
228+
});
229+
205230
it('should switch inner cold observables, inner never completes', () => {
206231
const x = cold( '--a--b--c--| ');
207232
const xsubs = ' ^ ! ';

src/internal/operators/exhaustMap.ts

+12-5
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,22 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
106106
}
107107

108108
private tryNext(value: T): void {
109+
let result: ObservableInput<R>;
109110
const index = this.index++;
110-
const destination = this.destination;
111111
try {
112-
const result = this.project(value, index);
113-
this.hasSubscription = true;
114-
this.add(subscribeToResult(this, result, value, index));
112+
result = this.project(value, index);
115113
} catch (err) {
116-
destination.error(err);
114+
this.destination.error(err);
115+
return;
117116
}
117+
this.hasSubscription = true;
118+
this._innerSub(result, value, index);
119+
}
120+
121+
private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
122+
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
123+
this.add(innerSubscriber);
124+
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
118125
}
119126

120127
protected _complete(): void {

0 commit comments

Comments
 (0)