Skip to content

Commit 2271a91

Browse files
authored
fix(share): propagate closed to firehose sources (#6370)
* test: enable firehose test for share * fix: add teardown directly to subscriber * chore: add comment * chore: remove firehose tests for deprecated ops * refactor: move it up even further
1 parent abf2bc1 commit 2271a91

File tree

5 files changed

+21
-101
lines changed

5 files changed

+21
-101
lines changed

spec/operators/multicast-spec.ts

-43
Original file line numberDiff line numberDiff line change
@@ -821,47 +821,4 @@ describe('multicast', () => {
821821
);
822822
});
823823
});
824-
825-
// TODO: fix firehose unsubscription
826-
// AFAICT, it's not possible for multicast observables to support ASAP
827-
// unsubscription from synchronous firehose sources. The problem is that the
828-
// chaining of the closed 'signal' is broken by the subject. For example,
829-
// here:
830-
//
831-
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/operators/multicast.ts#L53
832-
//
833-
// The subject is passed to subscribe. However, in the subscribe
834-
// implementation a SafeSubscriber is created with the subject as the
835-
// observer:
836-
//
837-
// https://github.com/ReactiveX/rxjs/blob/2d5e4d5bd7b684a912485e1c1583ba3d41c8308e/src/internal/Observable.ts#L210
838-
//
839-
// That breaks the chaining of closed - i.e. even if the unsubscribe is
840-
// called on the subject, closing it, the SafeSubscriber's closed property
841-
// won't reflect that.
842-
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
843-
const sideEffects: number[] = [];
844-
const synchronousObservable = new Observable<number>((subscriber) => {
845-
// This will check to see if the subscriber was closed on each loop
846-
// when the unsubscribe hits (from the `take`), it should be closed
847-
for (let i = 0; !subscriber.closed && i < 10; i++) {
848-
sideEffects.push(i);
849-
subscriber.next(i);
850-
}
851-
});
852-
853-
synchronousObservable
854-
.pipe(
855-
multicast(
856-
() => new Subject<number>(),
857-
(source) => source
858-
),
859-
take(3)
860-
)
861-
.subscribe(() => {
862-
/* noop */
863-
});
864-
865-
expect(sideEffects).to.deep.equal([0, 1, 2]);
866-
});
867824
});

spec/operators/refCount-spec.ts

-21
Original file line numberDiff line numberDiff line change
@@ -114,25 +114,4 @@ describe('refCount', () => {
114114
expect(arr[0]).to.equal('the number one');
115115
expect(arr[1]).to.equal('the number two');
116116
});
117-
118-
// TODO: fix firehose unsubscription
119-
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
120-
const sideEffects: number[] = [];
121-
const synchronousObservable = new Observable<number>(subscriber => {
122-
// This will check to see if the subscriber was closed on each loop
123-
// when the unsubscribe hits (from the `take`), it should be closed
124-
for (let i = 0; !subscriber.closed && i < 10; i++) {
125-
sideEffects.push(i);
126-
subscriber.next(i);
127-
}
128-
});
129-
130-
synchronousObservable.pipe(
131-
multicast(() => new Subject<number>()),
132-
refCount(),
133-
take(3),
134-
).subscribe(() => { /* noop */ });
135-
136-
expect(sideEffects).to.deep.equal([0, 1, 2]);
137-
});
138117
});

spec/operators/share-spec.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,7 @@ describe('share', () => {
427427
});
428428
});
429429

430-
// TODO: fix firehose unsubscription
431-
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
430+
it('should stop listening to a synchronous observable when unsubscribed', () => {
432431
const sideEffects: number[] = [];
433432
const synchronousObservable = new Observable<number>((subscriber) => {
434433
// This will check to see if the subscriber was closed on each loop

spec/operators/shareReplay-spec.ts

-19
Original file line numberDiff line numberDiff line change
@@ -347,25 +347,6 @@ describe('shareReplay', () => {
347347
});
348348
});
349349

350-
// TODO: fix firehose unsubscription
351-
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
352-
const sideEffects: number[] = [];
353-
const synchronousObservable = new Observable<number>((subscriber) => {
354-
// This will check to see if the subscriber was closed on each loop
355-
// when the unsubscribe hits (from the `take`), it should be closed
356-
for (let i = 0; !subscriber.closed && i < 10; i++) {
357-
sideEffects.push(i);
358-
subscriber.next(i);
359-
}
360-
});
361-
362-
synchronousObservable.pipe(shareReplay(), take(3)).subscribe(() => {
363-
/* noop */
364-
});
365-
366-
expect(sideEffects).to.deep.equal([0, 1, 2]);
367-
});
368-
369350
const FinalizationRegistry = (global as any).FinalizationRegistry;
370351
if (FinalizationRegistry) {
371352
it('should not leak the subscriber for sync sources', (done) => {

src/internal/operators/share.ts

+20-16
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,26 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
112112
// Create the subject if we don't have one yet.
113113
subject = subject ?? connector();
114114

115+
// Add the teardown directly to the subscriber - instead of returning it -
116+
// so that the handling of the subscriber's unsubscription will be wired
117+
// up _before_ the subscription to the source occurs. This is done so that
118+
// the assignment to the source connection's `closed` property will be seen
119+
// by synchronous firehose sources.
120+
subscriber.add(() => {
121+
refCount--;
122+
123+
// If we're resetting on refCount === 0, and it's 0, we only want to do
124+
// that on "unsubscribe", really. Resetting on error or completion is a different
125+
// configuration.
126+
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
127+
// We need to capture the connection before
128+
// we reset (if we need to reset).
129+
const conn = connection;
130+
reset();
131+
conn?.unsubscribe();
132+
}
133+
});
134+
115135
// The following line adds the subscription to the subscriber passed.
116136
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
117137
subject.subscribe(subscriber);
@@ -147,21 +167,5 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
147167
});
148168
from(source).subscribe(connection);
149169
}
150-
151-
// This is also added to `subscriber`, technically.
152-
return () => {
153-
refCount--;
154-
155-
// If we're resetting on refCount === 0, and it's 0, we only want to do
156-
// that on "unsubscribe", really. Resetting on error or completion is a different
157-
// configuration.
158-
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
159-
// We need to capture the connection before
160-
// we reset (if we need to reset).
161-
const conn = connection;
162-
reset();
163-
conn?.unsubscribe();
164-
}
165-
};
166170
});
167171
}

0 commit comments

Comments
 (0)