@@ -28,6 +28,7 @@ const {
28
28
ObjectDefineProperty,
29
29
ObjectSetPrototypeOf,
30
30
SymbolAsyncIterator,
31
+ Symbol
31
32
} = primordials ;
32
33
33
34
module . exports = Readable ;
@@ -51,6 +52,8 @@ const {
51
52
ERR_STREAM_UNSHIFT_AFTER_END_EVENT
52
53
} = require ( 'internal/errors' ) . codes ;
53
54
55
+ const kPaused = Symbol ( 'kPaused' ) ;
56
+
54
57
// Lazy loaded to improve the startup performance.
55
58
let StringDecoder ;
56
59
let createReadableStreamAsyncIterator ;
@@ -126,7 +129,7 @@ function ReadableState(options, stream, isDuplex) {
126
129
this . emittedReadable = false ;
127
130
this . readableListening = false ;
128
131
this . resumeScheduled = false ;
129
- this . paused = true ;
132
+ this [ kPaused ] = null ;
130
133
131
134
// True if the error was already emitted and should not be thrown again
132
135
this . errorEmitted = false ;
@@ -173,6 +176,16 @@ ObjectDefineProperty(ReadableState.prototype, 'pipesCount', {
173
176
}
174
177
} ) ;
175
178
179
+ // Legacy property for `paused`
180
+ ObjectDefineProperty ( ReadableState . prototype , 'paused' , {
181
+ get ( ) {
182
+ return this [ kPaused ] !== false ;
183
+ } ,
184
+ set ( value ) {
185
+ this [ kPaused ] = ! ! value ;
186
+ }
187
+ } ) ;
188
+
176
189
function Readable ( options ) {
177
190
if ( ! ( this instanceof Readable ) )
178
191
return new Readable ( options ) ;
@@ -368,7 +381,8 @@ function chunkInvalid(state, chunk) {
368
381
369
382
370
383
Readable . prototype . isPaused = function ( ) {
371
- return this . _readableState . flowing === false ;
384
+ const state = this . _readableState ;
385
+ return state [ kPaused ] === true || state . flowing === false ;
372
386
} ;
373
387
374
388
// Backwards compatibility.
@@ -967,14 +981,16 @@ function updateReadableListening(self) {
967
981
const state = self . _readableState ;
968
982
state . readableListening = self . listenerCount ( 'readable' ) > 0 ;
969
983
970
- if ( state . resumeScheduled && ! state . paused ) {
984
+ if ( state . resumeScheduled && state [ kPaused ] === false ) {
971
985
// Flowing needs to be set to true now, otherwise
972
986
// the upcoming resume will not flow.
973
987
state . flowing = true ;
974
988
975
989
// Crude way to check if we should resume
976
990
} else if ( self . listenerCount ( 'data' ) > 0 ) {
977
991
self . resume ( ) ;
992
+ } else if ( ! state . readableListening ) {
993
+ state . flowing = null ;
978
994
}
979
995
}
980
996
@@ -995,7 +1011,7 @@ Readable.prototype.resume = function() {
995
1011
state . flowing = ! state . readableListening ;
996
1012
resume ( this , state ) ;
997
1013
}
998
- state . paused = false ;
1014
+ state [ kPaused ] = false ;
999
1015
return this ;
1000
1016
} ;
1001
1017
@@ -1026,7 +1042,7 @@ Readable.prototype.pause = function() {
1026
1042
this . _readableState . flowing = false ;
1027
1043
this . emit ( 'pause' ) ;
1028
1044
}
1029
- this . _readableState . paused = true ;
1045
+ this . _readableState [ kPaused ] = true ;
1030
1046
return this ;
1031
1047
} ;
1032
1048
0 commit comments