Skip to content

Commit c50f528

Browse files
committed
fix(forEach): ensure that teardown logic is called when nextHandler throws
fixes #1411
1 parent 3477bd5 commit c50f528

File tree

2 files changed

+77
-9
lines changed

2 files changed

+77
-9
lines changed

spec/Observable-spec.ts

+56
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,62 @@ describe('Observable', () => {
8080
})
8181
.then(done);
8282
});
83+
84+
it('should handle a synchronous throw from the next handler and tear down', (done: DoneSignature) => {
85+
let unsubscribeCalled = false;
86+
const syncObservable = new Observable<number>((observer: Rx.Observer<number>) => {
87+
observer.next(1);
88+
observer.next(2);
89+
observer.next(3);
90+
91+
return () => {
92+
unsubscribeCalled = true;
93+
};
94+
});
95+
96+
const results = [];
97+
syncObservable.forEach((x) => {
98+
results.push(x);
99+
if (x === 2) {
100+
throw new Error('I told, you Bobby Boucher, twos are the debil!');
101+
}
102+
}).then(
103+
() => done.fail(),
104+
(err) => {
105+
results.push(err);
106+
expect(results).toEqual([1, 2, new Error('I told, you Bobby Boucher, twos are the debil!')]);
107+
expect(unsubscribeCalled).toBe(true);
108+
done();
109+
});
110+
});
111+
112+
it('should handle an asynchronous throw from the next handler and tear down', (done: DoneSignature) => {
113+
let unsubscribeCalled = false;
114+
const syncObservable = new Observable<number>((observer: Rx.Observer<number>) => {
115+
let i = 1;
116+
const id = setInterval(() => observer.next(i++));
117+
118+
return () => {
119+
clearInterval(id);
120+
unsubscribeCalled = true;
121+
};
122+
});
123+
124+
const results = [];
125+
syncObservable.forEach((x) => {
126+
results.push(x);
127+
if (x === 2) {
128+
throw new Error('I told, you Bobby Boucher, twos are the debil!');
129+
}
130+
}).then(
131+
() => done.fail(),
132+
(err) => {
133+
results.push(err);
134+
expect(results).toEqual([1, 2, new Error('I told, you Bobby Boucher, twos are the debil!')]);
135+
expect(unsubscribeCalled).toBe(true);
136+
done();
137+
});
138+
});
83139
});
84140

85141
describe('subscribe', () => {

src/Observable.ts

+21-9
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import {root} from './util/root';
66
import {CoreOperators} from './CoreOperators';
77
import {SymbolShim} from './util/SymbolShim';
88
import {toSubscriber} from './util/toSubscriber';
9-
import {tryCatch} from './util/tryCatch';
10-
import {errorObject} from './util/errorObject';
119

1210
import {combineLatestStatic} from './operator/combineLatest';
1311
import {concatStatic} from './operator/concat';
@@ -230,13 +228,27 @@ export class Observable<T> implements CoreOperators<T> {
230228
throw new Error('no Promise impl found');
231229
}
232230

233-
const source = this;
234-
235231
return new PromiseCtor<void>((resolve, reject) => {
236-
source.subscribe((value: T) => {
237-
const result: any = tryCatch(next)(value);
238-
if (result === errorObject) {
239-
reject(errorObject.e);
232+
const subscription = this.subscribe((value) => {
233+
if (subscription) {
234+
// if there is a subscription, then we can surmise
235+
// the next handling is asynchronous. Any errors thrown
236+
// need to be rejected explicitly and unsubscribe must be
237+
// called manually
238+
try {
239+
next(value);
240+
} catch (err) {
241+
reject(err);
242+
subscription.unsubscribe();
243+
}
244+
} else {
245+
// if there is NO subscription, then we're getting a nexted
246+
// value synchronously during subscription. We can just call it.
247+
// If it errors, Observable's `subscribe` imple will ensure the
248+
// unsubscription logic is called, then synchronously rethrow the error.
249+
// After that, Promise will trap the error and send it
250+
// down the rejection path.
251+
next(value);
240252
}
241253
}, reject, resolve);
242254
});
@@ -369,4 +381,4 @@ export class Observable<T> implements CoreOperators<T> {
369381
[SymbolShim.observable]() {
370382
return this;
371383
}
372-
}
384+
}

0 commit comments

Comments
 (0)