Skip to content

Commit d7f7078

Browse files
cartantbenlesh
authored andcommittedDec 6, 2019
fix: chain subscriptions from interop observables (#5059)
* test: add failing tests for interop observables * fix: check for interop subscriptions * chore: make subscribeToResult signatures safer * chore: add test comments * chore: add implementation comments
1 parent 07eb09b commit d7f7078

17 files changed

+311
-21
lines changed
 

‎spec/helpers/interop-helper-spec.ts

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { expect } from 'chai';
2+
import { Observable, of, Subscriber } from 'rxjs';
3+
import { observable as symbolObservable } from 'rxjs/internal/symbol/observable';
4+
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';
5+
import { asInteropObservable, asInteropSubscriber } from './interop-helper';
6+
7+
describe('interop helper', () => {
8+
it('should simulate interop observables', () => {
9+
const observable = asInteropObservable(of(42));
10+
expect(observable).to.not.be.instanceOf(Observable);
11+
expect(observable[symbolObservable]).to.be.a('function');
12+
});
13+
14+
it('should simulate interop subscribers', () => {
15+
const subscriber = asInteropSubscriber(new Subscriber());
16+
expect(subscriber).to.not.be.instanceOf(Subscriber);
17+
expect(subscriber[symbolSubscriber]).to.be.undefined;
18+
});
19+
});

‎spec/helpers/interop-helper.ts

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { Observable, Subscriber, Subscription } from 'rxjs';
2+
import { rxSubscriber as symbolSubscriber } from 'rxjs/internal/symbol/rxSubscriber';
3+
4+
/**
5+
* Returns an observable that will be deemed by this package's implementation
6+
* to be an observable that requires interop. The returned observable will fail
7+
* the `instanceof Observable` test and will deem any `Subscriber` passed to
8+
* its `subscribe` method to be untrusted.
9+
*/
10+
export function asInteropObservable<T>(observable: Observable<T>): Observable<T> {
11+
return new Proxy(observable, {
12+
get(target: Observable<T>, key: string | number | symbol) {
13+
if (key === 'subscribe') {
14+
const { subscribe } = target;
15+
return interopSubscribe(subscribe);
16+
}
17+
return Reflect.get(target, key);
18+
},
19+
getPrototypeOf(target: Observable<T>) {
20+
const { subscribe, ...rest } = Object.getPrototypeOf(target);
21+
return {
22+
...rest,
23+
subscribe: interopSubscribe(subscribe)
24+
};
25+
}
26+
});
27+
}
28+
29+
/**
30+
* Returns a subscriber that will be deemed by this package's implementation to
31+
* be untrusted. The returned subscriber will fail the `instanceof Subscriber`
32+
* test and will not include the symbol that identifies trusted subscribers.
33+
*/
34+
export function asInteropSubscriber<T>(subscriber: Subscriber<T>): Subscriber<T> {
35+
return new Proxy(subscriber, {
36+
get(target: Subscriber<T>, key: string | number | symbol) {
37+
if (key === symbolSubscriber) {
38+
return undefined;
39+
}
40+
return Reflect.get(target, key);
41+
},
42+
getPrototypeOf(target: Subscriber<T>) {
43+
const { [symbolSubscriber]: symbol, ...rest } = Object.getPrototypeOf(target);
44+
return rest;
45+
}
46+
});
47+
}
48+
49+
function interopSubscribe<T>(subscribe: (...args: any[]) => Subscription) {
50+
return function (this: Observable<T>, ...args: any[]): Subscription {
51+
const [arg] = args;
52+
if (arg instanceof Subscriber) {
53+
return subscribe.call(this, asInteropSubscriber(arg));
54+
}
55+
return subscribe.apply(this, args);
56+
};
57+
}

‎spec/operators/catch-spec.ts

+24
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as sinon from 'sinon';
55
import { createObservableInputs } from '../helpers/test-helper';
66
import { TestScheduler } from 'rxjs/testing';
77
import { observableMatcher } from '../helpers/observableMatcher';
8+
import { asInteropObservable } from '../helpers/interop-helper';
89

910
declare function asDiagram(arg: string): Function;
1011

@@ -139,6 +140,29 @@ describe('catchError operator', () => {
139140
});
140141
});
141142

143+
it('should unsubscribe from a caught cold caught interop observable when unsubscribed explicitly', () => {
144+
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
145+
const e1 = hot(' -1-2-3-# ');
146+
const e1subs = ' ^------! ';
147+
const e2 = cold(' 5-6-7-8-9-|');
148+
const e2subs = ' -------^----! ';
149+
const expected = '-1-2-3-5-6-7- ';
150+
const unsub = ' ------------! ';
151+
152+
// This test is the same as the previous test, but the observable is
153+
// manipulated to make it look like an interop observable - an observable
154+
// from a foreign library. Interop subscribers are treated differently:
155+
// they are wrapped in a safe subscriber. This test ensures that
156+
// unsubscriptions are chained all the way to the interop subscriber.
157+
158+
const result = e1.pipe(catchError(() => asInteropObservable(e2)));
159+
160+
expectObservable(result, unsub).toBe(expected);
161+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
162+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
163+
});
164+
});
165+
142166
it('should stop listening to a synchronous observable when unsubscribed', () => {
143167
const sideEffects: number[] = [];
144168
const synchronousObservable = concat(

‎spec/operators/exhaustMap-spec.ts

+34
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/mar
22
import { concat, defer, Observable, of, from } from 'rxjs';
33
import { exhaustMap, mergeMap, takeWhile, map } from 'rxjs/operators';
44
import { expect } from 'chai';
5+
import { asInteropObservable } from '../helpers/interop-helper';
56

67
declare function asDiagram(arg: string): Function;
78

@@ -202,6 +203,39 @@ describe('exhaustMap', () => {
202203
expectSubscriptions(e1.subscriptions).toBe(e1subs);
203204
});
204205

206+
it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
207+
const x = cold( '--a--b--c--| ');
208+
const xsubs = ' ^ ! ';
209+
const y = cold( '--d--e--f--| ');
210+
const ysubs: string[] = [];
211+
const z = cold( '--g--h--i--| ');
212+
const zsubs = ' ^ ! ';
213+
const e1 = hot('---x---------y-----------------z-------------|');
214+
const e1subs = '^ ! ';
215+
const expected = '-----a--b--c---------------------g- ';
216+
const unsub = ' ! ';
217+
218+
const observableLookup = { x: x, y: y, z: z };
219+
220+
// This test is the same as the previous test, but the observable is
221+
// manipulated to make it look like an interop observable - an observable
222+
// from a foreign library. Interop subscribers are treated differently:
223+
// they are wrapped in a safe subscriber. This test ensures that
224+
// unsubscriptions are chained all the way to the interop subscriber.
225+
226+
const result = e1.pipe(
227+
mergeMap(x => of(x)),
228+
exhaustMap(value => asInteropObservable(observableLookup[value])),
229+
mergeMap(x => of(x))
230+
);
231+
232+
expectObservable(result, unsub).toBe(expected);
233+
expectSubscriptions(x.subscriptions).toBe(xsubs);
234+
expectSubscriptions(y.subscriptions).toBe(ysubs);
235+
expectSubscriptions(z.subscriptions).toBe(zsubs);
236+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
237+
});
238+
205239
it('should stop listening to a synchronous observable when unsubscribed', () => {
206240
const sideEffects: number[] = [];
207241
const synchronousObservable = concat(

‎spec/operators/mergeMap-spec.ts

+31
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
22
import { mergeMap, map } from 'rxjs/operators';
33
import { asapScheduler, defer, Observable, from, of, timer } from 'rxjs';
44
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
5+
import { asInteropObservable } from '../helpers/interop-helper';
56

67
declare const type: Function;
78
declare const asDiagram: Function;
@@ -260,6 +261,36 @@ describe('mergeMap', () => {
260261
expectSubscriptions(e1.subscriptions).toBe(e1subs);
261262
});
262263

264+
it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
265+
const x = cold( '--a--b--c--d--e--| ');
266+
const xsubs = ' ^ ! ';
267+
const y = cold( '---f---g---h---i--|');
268+
const ysubs = ' ^ ! ';
269+
const e1 = hot('---------x---------y---------| ');
270+
const e1subs = '^ ! ';
271+
const expected = '-----------a--b--c--d- ';
272+
const unsub = ' ! ';
273+
274+
const observableLookup = { x: x, y: y };
275+
276+
// This test manipulates the observable to make it look like an interop
277+
// observable - an observable from a foreign library. Interop subscribers
278+
// are treated differently: they are wrapped in a safe subscriber. This
279+
// test ensures that unsubscriptions are chained all the way to the
280+
// interop subscriber.
281+
282+
const result = e1.pipe(
283+
mergeMap(x => of(x)),
284+
mergeMap(value => asInteropObservable(observableLookup[value])),
285+
mergeMap(x => of(x)),
286+
);
287+
288+
expectObservable(result, unsub).toBe(expected);
289+
expectSubscriptions(x.subscriptions).toBe(xsubs);
290+
expectSubscriptions(y.subscriptions).toBe(ysubs);
291+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
292+
});
293+
263294
it('should mergeMap many outer to many inner, inner never completes', () => {
264295
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
265296
const e1 = hot('-a-------b-------c-------d-------| ');

‎spec/operators/onErrorResumeNext-spec.ts

+18
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { onErrorResumeNext, takeWhile } from 'rxjs/operators';
44
import { concat, defer, throwError, of } from 'rxjs';
5+
import { asInteropObservable } from '../helpers/interop-helper';
56

67
declare function asDiagram(arg: string): Function;
78

@@ -129,6 +130,23 @@ describe('onErrorResumeNext operator', () => {
129130
expect(sideEffects).to.deep.equal([1, 2]);
130131
});
131132

133+
it('should unsubscribe from an interop observble upon explicit unsubscription', () => {
134+
const source = hot('--a--b--#');
135+
const next = cold( '--c--d--');
136+
const nextSubs = ' ^ !';
137+
const subs = '^ !';
138+
const expected = '--a--b----c--';
139+
140+
// This test manipulates the observable to make it look like an interop
141+
// observable - an observable from a foreign library. Interop subscribers
142+
// are treated differently: they are wrapped in a safe subscriber. This
143+
// test ensures that unsubscriptions are chained all the way to the
144+
// interop subscriber.
145+
146+
expectObservable(source.pipe(onErrorResumeNext(asInteropObservable(next))), subs).toBe(expected);
147+
expectSubscriptions(next.subscriptions).toBe(nextSubs);
148+
});
149+
132150
it('should work with promise', (done: MochaDone) => {
133151
const expected = [1, 2];
134152
const source = concat(of(1), throwError('meh'));

‎spec/operators/skipUntil-spec.ts

+26-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { concat, defer, Observable, of, Subject } from 'rxjs';
44
import { skipUntil, mergeMap } from 'rxjs/operators';
5+
import { asInteropObservable } from '../helpers/interop-helper';
56

67
declare function asDiagram(arg: string): Function;
78

@@ -97,6 +98,31 @@ describe('skipUntil', () => {
9798
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
9899
});
99100

101+
it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
102+
const e1 = hot('--a--b--c--d--e----|');
103+
const e1subs = '^ ! ';
104+
const skip = hot('-------------x--| ');
105+
const skipSubs = '^ ! ';
106+
const expected = ('---------- ');
107+
const unsub = ' ! ';
108+
109+
// This test is the same as the previous test, but the observable is
110+
// manipulated to make it look like an interop observable - an observable
111+
// from a foreign library. Interop subscribers are treated differently:
112+
// they are wrapped in a safe subscriber. This test ensures that
113+
// unsubscriptions are chained all the way to the interop subscriber.
114+
115+
const result = e1.pipe(
116+
mergeMap(x => of(x)),
117+
skipUntil(asInteropObservable(skip)),
118+
mergeMap(x => of(x)),
119+
);
120+
121+
expectObservable(result, unsub).toBe(expected);
122+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
123+
expectSubscriptions(skip.subscriptions).toBe(skipSubs);
124+
});
125+
100126
it('should skip all elements when notifier is empty', () => {
101127
const e1 = hot('--a--b--c--d--e--|');
102128
const e1subs = '^ !';
@@ -248,7 +274,6 @@ describe('skipUntil', () => {
248274
});
249275

250276
it('should stop listening to a synchronous notifier after its first nexted value', () => {
251-
// const source = hot('-^-o---o---o---o---o---o---|');
252277
const sideEffects: number[] = [];
253278
const synchronousNotifer = concat(
254279
defer(() => {

‎spec/operators/switchMap-spec.ts

+31
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { switchMap, mergeMap, map, takeWhile } from 'rxjs/operators';
44
import { concat, defer, of, Observable } from 'rxjs';
5+
import { asInteropObservable } from '../helpers/interop-helper';
56

67
declare function asDiagram(arg: string): Function;
78

@@ -169,6 +170,36 @@ describe('switchMap', () => {
169170
expectSubscriptions(e1.subscriptions).toBe(e1subs);
170171
});
171172

173+
it('should not break unsubscription chains with interop inners when result is unsubscribed explicitly', () => {
174+
const x = cold( '--a--b--c--d--e--| ');
175+
const xsubs = ' ^ ! ';
176+
const y = cold( '---f---g---h---i--|');
177+
const ysubs = ' ^ ! ';
178+
const e1 = hot('---------x---------y---------| ');
179+
const e1subs = '^ ! ';
180+
const expected = '-----------a--b--c---- ';
181+
const unsub = ' ! ';
182+
183+
const observableLookup = { x: x, y: y };
184+
185+
// This test is the same as the previous test, but the observable is
186+
// manipulated to make it look like an interop observable - an observable
187+
// from a foreign library. Interop subscribers are treated differently:
188+
// they are wrapped in a safe subscriber. This test ensures that
189+
// unsubscriptions are chained all the way to the interop subscriber.
190+
191+
const result = e1.pipe(
192+
mergeMap(x => of(x)),
193+
switchMap(value => asInteropObservable(observableLookup[value])),
194+
mergeMap(x => of(x)),
195+
);
196+
197+
expectObservable(result, unsub).toBe(expected);
198+
expectSubscriptions(x.subscriptions).toBe(xsubs);
199+
expectSubscriptions(y.subscriptions).toBe(ysubs);
200+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
201+
});
202+
172203
it('should stop listening to a synchronous observable when unsubscribed', () => {
173204
const sideEffects: number[] = [];
174205
const synchronousObservable = concat(

‎spec/util/toSubscriber-spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ describe('toSubscriber', () => {
1212
expect(sub2.closed).to.be.true;
1313
});
1414

15-
it('should not be closed when other subscriber created with same observer instance completes', () => {
15+
it('should not be closed when other subscriber created with same observer instance completes', () => {
1616
let observer = {
1717
next: function () { /*noop*/ }
1818
};

‎src/internal/operators/catchError.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,13 @@ class CatchSubscriber<T, R> extends OuterSubscriber<T, T | R> {
153153
this._unsubscribeAndRecycle();
154154
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
155155
this.add(innerSubscriber);
156-
subscribeToResult(this, result, undefined, undefined, innerSubscriber);
156+
const innerSubscription = subscribeToResult(this, result, undefined, undefined, innerSubscriber);
157+
// The returned subscription will usually be the subscriber that was
158+
// passed. However, interop subscribers will be wrapped and for
159+
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
160+
if (innerSubscription !== innerSubscriber) {
161+
this.add(innerSubscription);
162+
}
157163
}
158164
}
159165
}

‎src/internal/operators/exhaustMap.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,16 @@ class ExhaustMapSubscriber<T, R> extends OuterSubscriber<T, R> {
122122
}
123123

124124
private _innerSub(result: ObservableInput<R>, value: T, index: number): void {
125-
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
125+
const innerSubscriber = new InnerSubscriber(this, value, index);
126126
const destination = this.destination as Subscription;
127127
destination.add(innerSubscriber);
128-
subscribeToResult<T, R>(this, result, value, index, innerSubscriber);
128+
const innerSubscription = subscribeToResult<T, R>(this, result, undefined, undefined, innerSubscriber);
129+
// The returned subscription will usually be the subscriber that was
130+
// passed. However, interop subscribers will be wrapped and for
131+
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
132+
if (innerSubscription !== innerSubscriber) {
133+
destination.add(innerSubscription);
134+
}
129135
}
130136

131137
protected _complete(): void {

‎src/internal/operators/mergeMap.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,16 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
142142
}
143143

144144
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
145-
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
145+
const innerSubscriber = new InnerSubscriber(this, value, index);
146146
const destination = this.destination as Subscription;
147147
destination.add(innerSubscriber);
148-
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
148+
const innerSubscription = subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
149+
// The returned subscription will usually be the subscriber that was
150+
// passed. However, interop subscribers will be wrapped and for
151+
// unsubscriptions to chain correctly, the wrapper needs to be added, too.
152+
if (innerSubscription !== innerSubscriber) {
153+
destination.add(innerSubscription);
154+
}
149155
}
150156

151157
protected _complete(): void {

0 commit comments

Comments
 (0)
Please sign in to comment.