Skip to content

Commit 945f7a0

Browse files
Merge pull request #13453 from zhengjitf/fix-11601
fix(core): possible memory leak when using server side events
2 parents e3a47b6 + f797e16 commit 945f7a0

File tree

3 files changed

+72
-3
lines changed

3 files changed

+72
-3
lines changed

packages/core/router/router-response-controller.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
import { isObject } from '@nestjs/common/utils/shared.utils';
99
import { IncomingMessage } from 'http';
1010
import { EMPTY, lastValueFrom, Observable, isObservable } from 'rxjs';
11-
import { catchError, debounce, map } from 'rxjs/operators';
11+
import { catchError, concatMap, map } from 'rxjs/operators';
1212
import {
1313
AdditionalHeaders,
1414
WritableHeaderStream,
@@ -128,7 +128,7 @@ export class RouterResponseController {
128128

129129
return { data: message as object | string };
130130
}),
131-
debounce(
131+
concatMap(
132132
message =>
133133
new Promise<void>(resolve =>
134134
stream.writeMessage(message, () => resolve()),
@@ -153,6 +153,9 @@ export class RouterResponseController {
153153

154154
request.on('close', () => {
155155
subscription.unsubscribe();
156+
if (!stream.writableEnded) {
157+
stream.end();
158+
}
156159
});
157160
}
158161

packages/core/router/sse-stream.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export class SseStream extends Transform {
116116
message.id = this.lastEventId.toString();
117117
}
118118

119-
if (!this.write(message, 'utf-8', cb)) {
119+
if (!this.write(message, 'utf-8')) {
120120
this.once('drain', cb);
121121
} else {
122122
process.nextTick(cb);

packages/core/test/router/router-response-controller.spec.ts

+66
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { PassThrough, Writable } from 'stream';
77
import { HttpStatus, RequestMethod } from '../../../common';
88
import { RouterResponseController } from '../../router/router-response-controller';
99
import { NoopHttpAdapter } from '../utils/noop-adapter.spec';
10+
import { SseStream } from '../../router/sse-stream';
1011

1112
describe('RouterResponseController', () => {
1213
let adapter: NoopHttpAdapter;
@@ -374,6 +375,71 @@ data: test
374375
done();
375376
});
376377

378+
describe('when writing data too densely', () => {
379+
const DEFAULT_MAX_LISTENERS = SseStream.defaultMaxListeners;
380+
const MAX_LISTENERS = 1;
381+
const sandbox = sinon.createSandbox();
382+
383+
beforeEach(() => {
384+
// Can't access to the internal sseStream,
385+
// as a workround, set `defaultMaxListeners` of `SseStream` and reset the max listeners of `process`
386+
const PROCESS_MAX_LISTENERS = process.getMaxListeners();
387+
SseStream.defaultMaxListeners = MAX_LISTENERS;
388+
process.setMaxListeners(PROCESS_MAX_LISTENERS);
389+
390+
const sseStream = sinon.createStubInstance(SseStream);
391+
const originalWrite = SseStream.prototype.write;
392+
// Make `.write()` always return false, so as to listen `drain` event
393+
sseStream.write.callsFake(function (...args: any[]) {
394+
originalWrite.apply(this, args);
395+
return false;
396+
});
397+
sandbox.replace(SseStream.prototype, 'write', sseStream.write);
398+
});
399+
400+
afterEach(() => {
401+
sandbox.restore();
402+
SseStream.defaultMaxListeners = DEFAULT_MAX_LISTENERS;
403+
});
404+
405+
it('should not cause memory leak', async () => {
406+
let maxDrainListenersExceededWarning = null;
407+
process.on('warning', (warning: any) => {
408+
if (
409+
warning.name === 'MaxListenersExceededWarning' &&
410+
warning.emitter instanceof SseStream &&
411+
warning.type === 'drain' &&
412+
warning.count === MAX_LISTENERS + 1
413+
) {
414+
maxDrainListenersExceededWarning = warning;
415+
}
416+
});
417+
418+
const result = new Subject();
419+
420+
const response = new Writable();
421+
response._write = () => {};
422+
423+
const request = new Writable();
424+
request._write = () => {};
425+
426+
routerResponseController.sse(
427+
result,
428+
response as unknown as ServerResponse,
429+
request as unknown as IncomingMessage,
430+
);
431+
432+
// Send multiple messages simultaneously
433+
Array.from({ length: MAX_LISTENERS + 1 }).forEach((_, i) =>
434+
result.next(String(i)),
435+
);
436+
437+
await new Promise(resolve => process.nextTick(resolve));
438+
439+
expect(maxDrainListenersExceededWarning).to.equal(null);
440+
});
441+
});
442+
377443
describe('when there is an error', () => {
378444
it('should close the request', done => {
379445
const result = new Subject();

0 commit comments

Comments
 (0)