@@ -26,29 +26,8 @@ const { toPathIfFileURL } = require('internal/url');
26
26
const kIoDone = Symbol ( 'kIoDone' ) ;
27
27
const kIsPerformingIO = Symbol ( 'kIsPerformingIO' ) ;
28
28
29
- const kMinPoolSpace = 128 ;
30
29
const kFs = Symbol ( 'kFs' ) ;
31
30
32
- let pool ;
33
- // It can happen that we expect to read a large chunk of data, and reserve
34
- // a large chunk of the pool accordingly, but the read() call only filled
35
- // a portion of it. If a concurrently executing read() then uses the same pool,
36
- // the "reserved" portion cannot be used, so we allow it to be re-used as a
37
- // new pool later.
38
- const poolFragments = [ ] ;
39
-
40
- function allocNewPool ( poolSize ) {
41
- if ( poolFragments . length > 0 )
42
- pool = poolFragments . pop ( ) ;
43
- else
44
- pool = Buffer . allocUnsafe ( poolSize ) ;
45
- pool . used = 0 ;
46
- }
47
-
48
- function roundUpToMultipleOf8 ( n ) {
49
- return ( n + 7 ) & ~ 7 ; // Align to 8 byte boundary.
50
- }
51
-
52
31
function _construct ( callback ) {
53
32
const stream = this ;
54
33
if ( typeof stream . fd === 'number' ) {
@@ -188,70 +167,51 @@ ReadStream.prototype.open = openReadFs;
188
167
ReadStream . prototype . _construct = _construct ;
189
168
190
169
ReadStream . prototype . _read = function ( n ) {
191
- if ( ! pool || pool . length - pool . used < kMinPoolSpace ) {
192
- // Discard the old pool.
193
- allocNewPool ( this . readableHighWaterMark ) ;
194
- }
170
+ n = this . pos !== undefined ?
171
+ MathMin ( this . end - this . pos + 1 , n ) :
172
+ MathMin ( this . end - this . bytesRead + 1 , n ) ;
195
173
196
- // Grab another reference to the pool in the case that while we're
197
- // in the thread pool another read() finishes up the pool, and
198
- // allocates a new one.
199
- const thisPool = pool ;
200
- let toRead = MathMin ( pool . length - pool . used , n ) ;
201
- const start = pool . used ;
202
-
203
- if ( this . pos !== undefined )
204
- toRead = MathMin ( this . end - this . pos + 1 , toRead ) ;
205
- else
206
- toRead = MathMin ( this . end - this . bytesRead + 1 , toRead ) ;
174
+ if ( n <= 0 ) {
175
+ this . push ( null ) ;
176
+ return ;
177
+ }
207
178
208
- // Already read everything we were supposed to read!
209
- // treat as EOF.
210
- if ( toRead <= 0 )
211
- return this . push ( null ) ;
179
+ const buf = Buffer . allocUnsafeSlow ( n ) ;
212
180
213
- // the actual read.
214
181
this [ kIsPerformingIO ] = true ;
215
182
this [ kFs ]
216
- . read ( this . fd , pool , pool . used , toRead , this . pos , ( er , bytesRead ) => {
183
+ . read ( this . fd , buf , 0 , n , this . pos , ( er , bytesRead , buf ) => {
217
184
this [ kIsPerformingIO ] = false ;
185
+
218
186
// Tell ._destroy() that it's safe to close the fd now.
219
- if ( this . destroyed ) return this . emit ( kIoDone , er ) ;
187
+ if ( this . destroyed ) {
188
+ this . emit ( kIoDone , er ) ;
189
+ return ;
190
+ }
220
191
221
192
if ( er ) {
222
193
errorOrDestroy ( this , er ) ;
223
- } else {
224
- let b = null ;
225
- // Now that we know how much data we have actually read, re-wind the
226
- // 'used' field if we can, and otherwise allow the remainder of our
227
- // reservation to be used as a new pool later.
228
- if ( start + toRead === thisPool . used && thisPool === pool ) {
229
- const newUsed = thisPool . used + bytesRead - toRead ;
230
- thisPool . used = roundUpToMultipleOf8 ( newUsed ) ;
231
- } else {
232
- // Round down to the next lowest multiple of 8 to ensure the new pool
233
- // fragment start and end positions are aligned to an 8 byte boundary.
234
- const alignedEnd = ( start + toRead ) & ~ 7 ;
235
- const alignedStart = roundUpToMultipleOf8 ( start + bytesRead ) ;
236
- if ( alignedEnd - alignedStart >= kMinPoolSpace ) {
237
- poolFragments . push ( thisPool . slice ( alignedStart , alignedEnd ) ) ;
238
- }
239
- }
240
-
241
- if ( bytesRead > 0 ) {
242
- this . bytesRead += bytesRead ;
243
- b = thisPool . slice ( start , start + bytesRead ) ;
194
+ } else if ( bytesRead > 0 ) {
195
+ this . bytesRead += bytesRead ;
196
+
197
+ if ( bytesRead !== buf . length ) {
198
+ // Slow path. Shrink to fit.
199
+ // Copy instead of slice so that we don't retain
200
+ // large backing buffer for small reads.
201
+ const dst = Buffer . allocUnsafeSlow ( bytesRead ) ;
202
+ buf . copy ( dst , 0 , 0 , bytesRead ) ;
203
+ buf = dst ;
244
204
}
245
205
246
- this . push ( b ) ;
206
+ this . push ( buf ) ;
207
+ } else {
208
+ this . push ( null ) ;
247
209
}
248
210
} ) ;
249
211
250
- // Move the pool positions, and internal position for reading.
251
- if ( this . pos !== undefined )
252
- this . pos += toRead ;
253
-
254
- pool . used = roundUpToMultipleOf8 ( pool . used + toRead ) ;
212
+ if ( this . pos !== undefined ) {
213
+ this . pos += n ;
214
+ }
255
215
} ;
256
216
257
217
ReadStream . prototype . _destroy = function ( err , cb ) {
0 commit comments