@@ -80,17 +80,16 @@ var _require$codes = require('../errors').codes,
80
80
ERR_INVALID_ARG_TYPE = _require$codes . ERR_INVALID_ARG_TYPE ,
81
81
ERR_STREAM_PUSH_AFTER_EOF = _require$codes . ERR_STREAM_PUSH_AFTER_EOF ,
82
82
ERR_METHOD_NOT_IMPLEMENTED = _require$codes . ERR_METHOD_NOT_IMPLEMENTED ,
83
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes . ERR_STREAM_UNSHIFT_AFTER_END_EVENT ;
84
-
85
- var _require2 = require ( '../experimentalWarning' ) ,
86
- emitExperimentalWarning = _require2 . emitExperimentalWarning ; // Lazy loaded to improve the startup performance.
83
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes . ERR_STREAM_UNSHIFT_AFTER_END_EVENT ; // Lazy loaded to improve the startup performance.
87
84
88
85
89
86
var StringDecoder ;
90
87
var createReadableStreamAsyncIterator ;
88
+ var from ;
91
89
92
90
require ( 'inherits' ) ( Readable , Stream ) ;
93
91
92
+ var errorOrDestroy = destroyImpl . errorOrDestroy ;
94
93
var kProxyEvents = [ 'error' , 'close' , 'destroy' , 'pause' , 'resume' ] ;
95
94
96
95
function prependListener ( emitter , event , fn ) {
@@ -144,7 +143,9 @@ function ReadableState(options, stream, isDuplex) {
144
143
this . resumeScheduled = false ;
145
144
this . paused = true ; // Should close be emitted on destroy. Defaults to true.
146
145
147
- this . emitClose = options . emitClose !== false ; // has it been destroyed
146
+ this . emitClose = options . emitClose !== false ; // Should .destroy() be called after 'end' (and potentially 'finish')
147
+
148
+ this . autoDestroy = ! ! options . autoDestroy ; // has it been destroyed
148
149
149
150
this . destroyed = false ; // Crypto is kind of old and crusty. Historically, its default string
150
151
// encoding is 'binary' so we have to make this configurable.
@@ -257,16 +258,16 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
257
258
if ( ! skipChunkCheck ) er = chunkInvalid ( state , chunk ) ;
258
259
259
260
if ( er ) {
260
- stream . emit ( 'error' , er ) ;
261
+ errorOrDestroy ( stream , er ) ;
261
262
} else if ( state . objectMode || chunk && chunk . length > 0 ) {
262
263
if ( typeof chunk !== 'string' && ! state . objectMode && Object . getPrototypeOf ( chunk ) !== Buffer . prototype ) {
263
264
chunk = _uint8ArrayToBuffer ( chunk ) ;
264
265
}
265
266
266
267
if ( addToFront ) {
267
- if ( state . endEmitted ) stream . emit ( 'error' , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ; else addChunk ( stream , state , chunk , true ) ;
268
+ if ( state . endEmitted ) errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ; else addChunk ( stream , state , chunk , true ) ;
268
269
} else if ( state . ended ) {
269
- stream . emit ( 'error' , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
270
+ errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
270
271
} else if ( state . destroyed ) {
271
272
return false ;
272
273
} else {
@@ -322,17 +323,32 @@ Readable.prototype.isPaused = function () {
322
323
323
324
Readable . prototype . setEncoding = function ( enc ) {
324
325
if ( ! StringDecoder ) StringDecoder = require ( 'string_decoder/' ) . StringDecoder ;
325
- this . _readableState . decoder = new StringDecoder ( enc ) ; // if setEncoding(null), decoder.encoding equals utf8
326
+ var decoder = new StringDecoder ( enc ) ;
327
+ this . _readableState . decoder = decoder ; // If setEncoding(null), decoder.encoding equals utf8
328
+
329
+ this . _readableState . encoding = this . _readableState . decoder . encoding ; // Iterate over current buffer to convert already stored Buffers:
330
+
331
+ var p = this . _readableState . buffer . head ;
332
+ var content = '' ;
333
+
334
+ while ( p !== null ) {
335
+ content += decoder . write ( p . data ) ;
336
+ p = p . next ;
337
+ }
338
+
339
+ this . _readableState . buffer . clear ( ) ;
326
340
327
- this . _readableState . encoding = this . _readableState . decoder . encoding ;
341
+ if ( content !== '' ) this . _readableState . buffer . push ( content ) ;
342
+ this . _readableState . length = content . length ;
328
343
return this ;
329
- } ; // Don't raise the hwm > 8MB
344
+ } ; // Don't raise the hwm > 1GB
330
345
331
346
332
- var MAX_HWM = 0x800000 ;
347
+ var MAX_HWM = 0x40000000 ;
333
348
334
349
function computeNewHighWaterMark ( n ) {
335
350
if ( n >= MAX_HWM ) {
351
+ // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
336
352
n = MAX_HWM ;
337
353
} else {
338
354
// Get the next highest power of 2 to prevent increasing hwm excessively in
@@ -449,7 +465,7 @@ Readable.prototype.read = function (n) {
449
465
if ( n > 0 ) ret = fromList ( n , state ) ; else ret = null ;
450
466
451
467
if ( ret === null ) {
452
- state . needReadable = true ;
468
+ state . needReadable = state . length <= state . highWaterMark ;
453
469
n = 0 ;
454
470
} else {
455
471
state . length -= n ;
@@ -469,6 +485,7 @@ Readable.prototype.read = function (n) {
469
485
} ;
470
486
471
487
function onEofChunk ( stream , state ) {
488
+ debug ( 'onEofChunk' ) ;
472
489
if ( state . ended ) return ;
473
490
474
491
if ( state . decoder ) {
@@ -503,6 +520,7 @@ function onEofChunk(stream, state) {
503
520
504
521
function emitReadable ( stream ) {
505
522
var state = stream . _readableState ;
523
+ debug ( 'emitReadable' , state . needReadable , state . emittedReadable ) ;
506
524
state . needReadable = false ;
507
525
508
526
if ( ! state . emittedReadable ) {
@@ -518,6 +536,7 @@ function emitReadable_(stream) {
518
536
519
537
if ( ! state . destroyed && ( state . length || state . ended ) ) {
520
538
stream . emit ( 'readable' ) ;
539
+ state . emittedReadable = false ;
521
540
} // The stream needs another readable event if
522
541
// 1. It is not flowing, as the flow mechanism will take
523
542
// care of it.
@@ -583,7 +602,7 @@ function maybeReadMore_(stream, state) {
583
602
584
603
585
604
Readable . prototype . _read = function ( n ) {
586
- this . emit ( 'error' , new ERR_METHOD_NOT_IMPLEMENTED ( '_read()' ) ) ;
605
+ errorOrDestroy ( this , new ERR_METHOD_NOT_IMPLEMENTED ( '_read()' ) ) ;
587
606
} ;
588
607
589
608
Readable . prototype . pipe = function ( dest , pipeOpts ) {
@@ -682,7 +701,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
682
701
debug ( 'onerror' , er ) ;
683
702
unpipe ( ) ;
684
703
dest . removeListener ( 'error' , onerror ) ;
685
- if ( EElistenerCount ( dest , 'error' ) === 0 ) dest . emit ( 'error' , er ) ;
704
+ if ( EElistenerCount ( dest , 'error' ) === 0 ) errorOrDestroy ( dest , er ) ;
686
705
} // Make sure our error handler is attached before userland ones.
687
706
688
707
@@ -986,8 +1005,6 @@ Readable.prototype.wrap = function (stream) {
986
1005
987
1006
if ( typeof Symbol === 'function' ) {
988
1007
Readable . prototype [ Symbol . asyncIterator ] = function ( ) {
989
- emitExperimentalWarning ( 'Readable[Symbol.asyncIterator]' ) ;
990
-
991
1008
if ( createReadableStreamAsyncIterator === undefined ) {
992
1009
createReadableStreamAsyncIterator = require ( './internal/streams/async_iterator' ) ;
993
1010
}
@@ -1075,9 +1092,29 @@ function endReadableNT(state, stream) {
1075
1092
state . endEmitted = true ;
1076
1093
stream . readable = false ;
1077
1094
stream . emit ( 'end' ) ;
1095
+
1096
+ if ( state . autoDestroy ) {
1097
+ // In case of duplex streams we need a way to detect
1098
+ // if the writable side is ready for autoDestroy as well
1099
+ var wState = stream . _writableState ;
1100
+
1101
+ if ( ! wState || wState . autoDestroy && wState . finished ) {
1102
+ stream . destroy ( ) ;
1103
+ }
1104
+ }
1078
1105
}
1079
1106
}
1080
1107
1108
+ if ( typeof Symbol === 'function' ) {
1109
+ Readable . from = function ( iterable , opts ) {
1110
+ if ( from === undefined ) {
1111
+ from = require ( './internal/streams/from' ) ;
1112
+ }
1113
+
1114
+ return from ( Readable , iterable , opts ) ;
1115
+ } ;
1116
+ }
1117
+
1081
1118
function indexOf ( xs , x ) {
1082
1119
for ( var i = 0 , l = xs . length ; i < l ; i ++ ) {
1083
1120
if ( xs [ i ] === x ) return i ;
0 commit comments