@@ -118,6 +118,7 @@ const kHasFlowing = 1 << 23;
118
118
const kFlowing = 1 << 24 ;
119
119
const kHasPaused = 1 << 25 ;
120
120
const kPaused = 1 << 26 ;
121
+ const kDataListening = 1 << 27 ;
121
122
122
123
// TODO(benjamingr) it is likely slower to do it this way than with free functions
123
124
function makeBitMapDescriptor ( bit ) {
@@ -527,8 +528,7 @@ function canPushMore(state) {
527
528
}
528
529
529
530
function addChunk ( stream , state , chunk , addToFront ) {
530
- if ( ( state [ kState ] & ( kFlowing | kSync ) ) === kFlowing && state . length === 0 &&
531
- stream . listenerCount ( 'data' ) > 0 ) {
531
+ if ( ( state [ kState ] & ( kFlowing | kSync | kDataListening ) ) === ( kFlowing | kDataListening ) && state . length === 0 ) {
532
532
// Use the guard to avoid creating `Set()` repeatedly
533
533
// when we have multiple pipes.
534
534
if ( ( state [ kState ] & kMultiAwaitDrain ) !== 0 ) {
@@ -1062,7 +1062,7 @@ function pipeOnDrain(src, dest) {
1062
1062
}
1063
1063
1064
1064
if ( ( ! state . awaitDrainWriters || state . awaitDrainWriters . size === 0 ) &&
1065
- src . listenerCount ( 'data' ) ) {
1065
+ ( state [ kState ] & kDataListening ) !== 0 ) {
1066
1066
src . resume ( ) ;
1067
1067
}
1068
1068
} ;
@@ -1109,6 +1109,8 @@ Readable.prototype.on = function(ev, fn) {
1109
1109
const state = this . _readableState ;
1110
1110
1111
1111
if ( ev === 'data' ) {
1112
+ state [ kState ] |= kDataListening ;
1113
+
1112
1114
// Update readableListening so that resume() may be a no-op
1113
1115
// a few lines down. This is needed to support once('readable').
1114
1116
state [ kState ] |= this . listenerCount ( 'readable' ) > 0 ? kReadableListening : 0 ;
@@ -1135,6 +1137,8 @@ Readable.prototype.on = function(ev, fn) {
1135
1137
Readable . prototype . addListener = Readable . prototype . on ;
1136
1138
1137
1139
Readable . prototype . removeListener = function ( ev , fn ) {
1140
+ const state = this . _readableState ;
1141
+
1138
1142
const res = Stream . prototype . removeListener . call ( this ,
1139
1143
ev , fn ) ;
1140
1144
@@ -1146,6 +1150,8 @@ Readable.prototype.removeListener = function(ev, fn) {
1146
1150
// resume within the same tick will have no
1147
1151
// effect.
1148
1152
process . nextTick ( updateReadableListening , this ) ;
1153
+ } else if ( ev === 'data' && this . listenerCount ( 'data' ) === 0 ) {
1154
+ state [ kState ] &= ~ kDataListening ;
1149
1155
}
1150
1156
1151
1157
return res ;
@@ -1184,7 +1190,7 @@ function updateReadableListening(self) {
1184
1190
state [ kState ] |= kHasFlowing | kFlowing ;
1185
1191
1186
1192
// Crude way to check if we should resume.
1187
- } else if ( self . listenerCount ( 'data' ) > 0 ) {
1193
+ } else if ( ( state [ kState ] & kDataListening ) !== 0 ) {
1188
1194
self . resume ( ) ;
1189
1195
} else if ( ( state [ kState ] & kReadableListening ) === 0 ) {
1190
1196
state [ kState ] &= ~ ( kHasFlowing | kFlowing ) ;
0 commit comments