Skip to content

Commit aef9578

Browse files
staltzbenlesh
authored andcommitted
fix(ConnectableObservable): fix ConnectableObservable connectability and refCounting
When the ConnectableObservable with refCount always shares the same instance of the underlying subject (such as in publish, publishReplay, publishBehavior), the subscription to the connectable observable should NOT incur additional subscriptions to the underlying cold source. See how tests for publish/publishBehavior/publishReplay were updated to assert that only one subscription to the underlying cold source happens, not multiple, because as soon as the multicasting subject raises an error, this error impedes subsequent subscriptions to the cold source from happening. Fix ConnectableObservable, its connect() method, and the RefCountObservable to support synchronous retry/repeat in the presence of multiple subscribers, and to support retry/repeat in other asynchronous scenarios. Resolves bug #678.
1 parent d797b4f commit aef9578

File tree

4 files changed

+81
-30
lines changed

4 files changed

+81
-30
lines changed

spec/operators/publish-spec.js

+2-7
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,7 @@ describe('Observable.prototype.publish()', function () {
132132

133133
it('should NOT be retryable', function () {
134134
var source = cold('-1-2-3----4-#');
135-
var sourceSubs = ['^ !',
136-
' (^!)',
137-
' (^!)',
138-
' (^!)'];
135+
var sourceSubs = '^ !';
139136
var published = source.publish().refCount().retry(3);
140137
var subscriber1 = hot('a| ').mergeMapTo(published);
141138
var expected1 = '-1-2-3----4-#';
@@ -152,9 +149,7 @@ describe('Observable.prototype.publish()', function () {
152149

153150
it('should NOT be repeatable', function () {
154151
var source = cold('-1-2-3----4-|');
155-
var sourceSubs = ['^ !',
156-
' (^!)',
157-
' (^!)'];
152+
var sourceSubs = '^ !';
158153
var published = source.publish().refCount().repeat(3);
159154
var subscriber1 = hot('a| ').mergeMapTo(published);
160155
var expected1 = '-1-2-3----4-|';

spec/operators/publishBehavior-spec.js

+2-7
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ describe('Observable.prototype.publishBehavior()', function () {
131131

132132
it('should NOT be retryable', function () {
133133
var source = cold('-1-2-3----4-#');
134-
var sourceSubs = ['^ !',
135-
' (^!)',
136-
' (^!)',
137-
' (^!)'];
134+
var sourceSubs = '^ !';
138135
var published = source.publishBehavior('0').refCount().retry(3);
139136
var subscriber1 = hot('a| ').mergeMapTo(published);
140137
var expected1 = '01-2-3----4-#';
@@ -151,9 +148,7 @@ describe('Observable.prototype.publishBehavior()', function () {
151148

152149
it('should NOT be repeatable', function () {
153150
var source = cold('-1-2-3----4-|');
154-
var sourceSubs = ['^ !',
155-
' (^!)',
156-
' (^!)'];
151+
var sourceSubs = '^ !';
157152
var published = source.publishBehavior('0').refCount().repeat(3);
158153
var subscriber1 = hot('a| ').mergeMapTo(published);
159154
var expected1 = '01-2-3----4-|';

spec/operators/publishReplay-spec.js

+2-7
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,7 @@ describe('Observable.prototype.publishReplay()', function () {
150150

151151
it('should NOT be retryable', function () {
152152
var source = cold('-1-2-3----4-#');
153-
var sourceSubs = ['^ !',
154-
' (^!)',
155-
' (^!)',
156-
' (^!)'];
153+
var sourceSubs = '^ !';
157154
var published = source.publishReplay(1).refCount().retry(3);
158155
var subscriber1 = hot('a| ').mergeMapTo(published);
159156
var expected1 = '-1-2-3----4-(444#)';
@@ -170,9 +167,7 @@ describe('Observable.prototype.publishReplay()', function () {
170167

171168
it('should NOT be repeatable', function () {
172169
var source = cold('-1-2-3----4-|');
173-
var sourceSubs = ['^ !',
174-
' (^!)',
175-
' (^!)'];
170+
var sourceSubs = '^ !';
176171
var published = source.publishReplay(1).refCount().repeat(3);
177172
var subscriber1 = hot('a| ').mergeMapTo(published);
178173
var expected1 = '-1-2-3----4-(44|)';

src/observables/ConnectableObservable.ts

+75-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {Subject} from '../Subject';
22
import {Observable} from '../Observable';
33
import {Subscription} from '../Subscription';
4+
import {Subscriber} from '../Subscriber';
45

56
export class ConnectableObservable<T> extends Observable<T> {
67

@@ -24,7 +25,16 @@ export class ConnectableObservable<T> extends Observable<T> {
2425
return (this.subject = this.subjectFactory());
2526
}
2627

27-
connect() {
28+
connect(onSubscribe?: (subscription: Subscription<T>) => void): Subscription<T> {
29+
if (onSubscribe) {
30+
this._callbackConnect(onSubscribe);
31+
return null;
32+
} else {
33+
return this._returningConnect();
34+
}
35+
}
36+
37+
_returningConnect(): Subscription<T> {
2838
const source = this.source;
2939
let subscription = this.subscription;
3040
if (subscription && !subscription.isUnsubscribed) {
@@ -35,6 +45,26 @@ export class ConnectableObservable<T> extends Observable<T> {
3545
return (this.subscription = subscription);
3646
}
3747

48+
/**
49+
* Instructs the ConnectableObservable to begin emitting the items from its
50+
* underlying source to its Subscribers.
51+
*
52+
* @param onSubscribe a function that receives the connection subscription
53+
* before the subscription to source happens, allowing the caller to
54+
* synchronously disconnect a synchronous source.
55+
*/
56+
_callbackConnect(onSubscribe: (subscription: Subscription<T>) => void): void {
57+
let subscription = this.subscription;
58+
if (subscription && !subscription.isUnsubscribed) {
59+
onSubscribe(subscription);
60+
return;
61+
}
62+
this.subscription = subscription = new Subscription();
63+
onSubscribe(subscription);
64+
subscription.add(this.source.subscribe(this._getSubject()));
65+
subscription.add(new ConnectableSubscription(this));
66+
}
67+
3868
refCount(): Observable<T> {
3969
return new RefCountObservable(this);
4070
}
@@ -62,24 +92,60 @@ class RefCountObservable<T> extends Observable<T> {
6292

6393
_subscribe(subscriber) {
6494
const connectable = this.connectable;
65-
const subscription = connectable.subscribe(subscriber);
66-
if (++this.refCount === 1) {
67-
this.connection = connectable.connect();
95+
const refCountSubscriber = new RefCountSubscriber(subscriber, this);
96+
refCountSubscriber.myConnection = this.connection;
97+
const subscription = connectable.subscribe(refCountSubscriber);
98+
99+
if (!subscription.isUnsubscribed && ++this.refCount === 1) {
100+
connectable.connect(_subscription => {
101+
refCountSubscriber.myConnection = this.connection = _subscription;
102+
});
68103
}
69-
subscription.add(new RefCountSubscription(this));
70104
return subscription;
71105
}
72106
}
73107

74-
class RefCountSubscription<T> extends Subscription<T> {
108+
class RefCountSubscriber<T> extends Subscriber<T> {
109+
myConnection: Subscription<T>;
75110

76-
constructor(private refCountObservable: RefCountObservable<T>) {
77-
super();
111+
constructor(public destination: Subscriber<T>,
112+
private refCountObservable: RefCountObservable<T>) {
113+
super(null);
114+
destination.add(this);
115+
}
116+
117+
_next(value: T) {
118+
this.destination.next(value);
119+
}
120+
121+
_error(err: any) {
122+
this._resetConnectable();
123+
this.destination.error(err);
124+
}
125+
126+
_complete() {
127+
this._resetConnectable();
128+
this.destination.complete();
129+
}
130+
131+
_resetConnectable() {
132+
const observable = this.refCountObservable;
133+
const myConnection = this.myConnection;
134+
if (myConnection && myConnection === observable.connection) {
135+
observable.refCount = 0;
136+
observable.connection.unsubscribe();
137+
observable.connection = void 0;
138+
this.unsubscribe();
139+
}
78140
}
79141

80142
_unsubscribe() {
81143
const observable = this.refCountObservable;
82-
if (--observable.refCount === 0) {
144+
if (observable.refCount === 0) {
145+
return;
146+
}
147+
const myConnection = this.myConnection;
148+
if (--observable.refCount === 0 && myConnection && myConnection === observable.connection) {
83149
observable.connection.unsubscribe();
84150
observable.connection = void 0;
85151
}

0 commit comments

Comments
 (0)