Skip to content

Commit ba1ebb4

Browse files
mcollinarvagg
authored andcommitted
stream: correctly pause and resume after once('readable')
Fixes: #24281 PR-URL: #24366 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
1 parent 24acd53 commit ba1ebb4

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

lib/_stream_readable.js

+12-3
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function ReadableState(options, stream, isDuplex) {
114114
this.emittedReadable = false;
115115
this.readableListening = false;
116116
this.resumeScheduled = false;
117+
this.paused = true;
117118

118119
// Should close be emitted on destroy. Defaults to true.
119120
this.emitClose = options.emitClose !== false;
@@ -862,10 +863,16 @@ Readable.prototype.removeAllListeners = function(ev) {
862863
};
863864

864865
function updateReadableListening(self) {
865-
self._readableState.readableListening = self.listenerCount('readable') > 0;
866+
const state = self._readableState;
867+
state.readableListening = self.listenerCount('readable') > 0;
866868

867-
// crude way to check if we should resume
868-
if (self.listenerCount('data') > 0) {
869+
if (state.resumeScheduled && !state.paused) {
870+
// flowing needs to be set to true now, otherwise
871+
// the upcoming resume will not flow.
872+
state.flowing = true;
873+
874+
// crude way to check if we should resume
875+
} else if (self.listenerCount('data') > 0) {
869876
self.resume();
870877
}
871878
}
@@ -887,6 +894,7 @@ Readable.prototype.resume = function() {
887894
state.flowing = !state.readableListening;
888895
resume(this, state);
889896
}
897+
state.paused = false;
890898
return this;
891899
};
892900

@@ -917,6 +925,7 @@ Readable.prototype.pause = function() {
917925
this._readableState.flowing = false;
918926
this.emit('pause');
919927
}
928+
this._readableState.paused = true;
920929
return this;
921930
};
922931

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const { Readable } = require('stream');
5+
6+
// This test verifies that a stream could be resumed after
7+
// removing the readable event in the same tick
8+
9+
check(new Readable({
10+
objectMode: true,
11+
highWaterMark: 1,
12+
read() {
13+
if (!this.first) {
14+
this.push('hello');
15+
this.first = true;
16+
return;
17+
}
18+
19+
this.push(null);
20+
}
21+
}));
22+
23+
function check(s) {
24+
const readableListener = common.mustNotCall();
25+
s.on('readable', readableListener);
26+
s.on('end', common.mustCall());
27+
s.removeListener('readable', readableListener);
28+
s.resume();
29+
}

0 commit comments

Comments
 (0)