Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Node v10.17.0 #420

Merged
merged 8 commits into from
Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ npm install --save readable-stream

This package is a mirror of the streams implementations in Node.js.

Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.15.3/docs/api/stream.html).
Full documentation may be found on the [Node.js website](https://nodejs.org/dist/v10.17.0/docs/api/stream.html).

If you want to guarantee a stable streams base, regardless of what version of
Node you, or the users of your libraries are using, use **readable-stream** *only* and avoid the *"stream"* module in Node-core, for background see [this blogpost](http://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html).
Expand Down
6 changes: 5 additions & 1 deletion build/test-replacements.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ module.exports.all = [
]
, bufferShimFix
, bufferStaticMethods
, [
, [
/require\(['"]assert['"]\)/g
, 'require(\'assert/\')'
]
, [
/\/\/ Flags: .*/
, ''
]
]

module.exports['test-stream2-basic.js'] = [
Expand Down
110 changes: 95 additions & 15 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';

function asyncGeneratorStep(gen, resolve, reject, _next, _throw, key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { Promise.resolve(value).then(_next, _throw); } }

function _asyncToGenerator(fn) { return function () { var self = this, args = arguments; return new Promise(function (resolve, reject) { var gen = fn.apply(self, args); function _next(value) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "next", value); } function _throw(err) { asyncGeneratorStep(gen, resolve, reject, _next, _throw, "throw", err); } _next(undefined); }); }; }

function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { keys.push.apply(keys, Object.getOwnPropertySymbols(object)); } if (enumerableOnly) keys = keys.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); return keys; }

function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(source, true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(source).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; }

function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }

module.exports = Readable;
/*<replacement>*/

Expand Down Expand Up @@ -80,17 +90,15 @@ var _require$codes = require('../errors').codes,
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT;

var _require2 = require('../experimentalWarning'),
emitExperimentalWarning = _require2.emitExperimentalWarning; // Lazy loaded to improve the startup performance.
ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; // Lazy loaded to improve the startup performance.


var StringDecoder;
var createReadableStreamAsyncIterator;

require('inherits')(Readable, Stream);

var errorOrDestroy = destroyImpl.errorOrDestroy;
var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];

function prependListener(emitter, event, fn) {
Expand Down Expand Up @@ -144,7 +152,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false;
this.paused = true; // Should close be emitted on destroy. Defaults to true.

this.emitClose = options.emitClose !== false; // has it been destroyed
this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish')

this.autoDestroy = !!options.autoDestroy; // has it been destroyed

this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
Expand Down Expand Up @@ -257,16 +267,16 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (!skipChunkCheck) er = chunkInvalid(state, chunk);

if (er) {
stream.emit('error', er);
errorOrDestroy(stream, er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = _uint8ArrayToBuffer(chunk);
}

if (addToFront) {
if (state.endEmitted) stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true);
if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
Expand Down Expand Up @@ -322,9 +332,23 @@ Readable.prototype.isPaused = function () {

Readable.prototype.setEncoding = function (enc) {
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
this._readableState.decoder = new StringDecoder(enc); // if setEncoding(null), decoder.encoding equals utf8
var decoder = new StringDecoder(enc);
this._readableState.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8

this._readableState.encoding = this._readableState.decoder.encoding; // Iterate over current buffer to convert already stored Buffers:

this._readableState.encoding = this._readableState.decoder.encoding;
var p = this._readableState.buffer.head;
var content = '';

while (p !== null) {
content += decoder.write(p.data);
p = p.next;
}

this._readableState.buffer.clear();

if (content !== '') this._readableState.buffer.push(content);
this._readableState.length = content.length;
return this;
}; // Don't raise the hwm > 8MB

Expand Down Expand Up @@ -449,7 +473,7 @@ Readable.prototype.read = function (n) {
if (n > 0) ret = fromList(n, state);else ret = null;

if (ret === null) {
state.needReadable = true;
state.needReadable = state.length <= state.highWaterMark;
n = 0;
} else {
state.length -= n;
Expand All @@ -469,6 +493,7 @@ Readable.prototype.read = function (n) {
};

function onEofChunk(stream, state) {
debug('onEofChunk');
if (state.ended) return;

if (state.decoder) {
Expand Down Expand Up @@ -503,6 +528,7 @@ function onEofChunk(stream, state) {

function emitReadable(stream) {
var state = stream._readableState;
debug('emitReadable', state.needReadable, state.emittedReadable);
state.needReadable = false;

if (!state.emittedReadable) {
Expand All @@ -518,6 +544,7 @@ function emitReadable_(stream) {

if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
state.emittedReadable = false;
} // The stream needs another readable event if
// 1. It is not flowing, as the flow mechanism will take
// care of it.
Expand Down Expand Up @@ -583,7 +610,7 @@ function maybeReadMore_(stream, state) {


Readable.prototype._read = function (n) {
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};

Readable.prototype.pipe = function (dest, pipeOpts) {
Expand Down Expand Up @@ -682,7 +709,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
if (EElistenerCount(dest, 'error') === 0) errorOrDestroy(dest, er);
} // Make sure our error handler is attached before userland ones.


Expand Down Expand Up @@ -986,8 +1013,6 @@ Readable.prototype.wrap = function (stream) {

if (typeof Symbol === 'function') {
Readable.prototype[Symbol.asyncIterator] = function () {
emitExperimentalWarning('Readable[Symbol.asyncIterator]');

if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator = require('./internal/streams/async_iterator');
}
Expand Down Expand Up @@ -1075,9 +1100,64 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');

if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the writable side is ready for autoDestroy as well
var wState = stream._writableState;

if (!wState || wState.autoDestroy && wState.finished) {
stream.destroy();
}
}
}
}

Readable.from = function (iterable, opts) {
var iterator;
if (iterable && iterable[Symbol.asyncIterator]) iterator = iterable[Symbol.asyncIterator]();else if (iterable && iterable[Symbol.iterator]) iterator = iterable[Symbol.iterator]();else throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
var readable = new Readable(_objectSpread({
objectMode: true
}, opts)); // Reading boolean to protect against _read
// being called before last iteration completion.

var reading = false;

readable._read = function () {
if (!reading) {
reading = true;
next();
}
};

function next() {
return _next2.apply(this, arguments);
}

function _next2() {
_next2 = _asyncToGenerator(function* () {
try {
var _ref = yield iterator.next(),
value = _ref.value,
done = _ref.done;

if (done) {
readable.push(null);
} else if (readable.push((yield value))) {
next();
} else {
reading = false;
}
} catch (err) {
readable.destroy(err);
}
});
return _next2.apply(this, arguments);
}

return readable;
};

function indexOf(xs, x) {
for (var i = 0, l = xs.length; i < l; i++) {
if (xs[i] === x) return i;
Expand Down
28 changes: 21 additions & 7 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ var _require$codes = require('../errors').codes,
ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING;

var errorOrDestroy = destroyImpl.errorOrDestroy;

require('inherits')(Writable, Stream);

function nop() {}
Expand Down Expand Up @@ -173,7 +175,9 @@ function WritableState(options, stream, isDuplex) {

this.errorEmitted = false; // Should close be emitted on destroy. Defaults to true.

this.emitClose = options.emitClose !== false; // count buffered requests
this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'finish' (and potentially 'end')

this.autoDestroy = !!options.autoDestroy; // count buffered requests

this.bufferedRequestCount = 0; // allocate the first CorkedRequest, there is always
// one allocated and free to use, and we maintain at most two
Expand Down Expand Up @@ -250,13 +254,13 @@ function Writable(options) {


Writable.prototype.pipe = function () {
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
};

function writeAfterEnd(stream, cb) {
var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb

stream.emit('error', er);
errorOrDestroy(stream, er);
process.nextTick(cb, er);
} // Checks that a user-supplied chunk is valid, especially for the particular
// mode the stream is in. Currently this means that `null` is never accepted
Expand All @@ -273,7 +277,7 @@ function validChunk(stream, state, chunk, cb) {
}

if (er) {
stream.emit('error', er);
errorOrDestroy(stream, er);
process.nextTick(cb, er);
return false;
}
Expand Down Expand Up @@ -417,13 +421,13 @@ function onwriteError(stream, state, sync, er, cb) {

process.nextTick(finishMaybe, stream, state);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
errorOrDestroy(stream, er);
} else {
// the caller expect this to happen before if
// it is async
cb(er);
stream._writableState.errorEmitted = true;
stream.emit('error', er); // this can emit finish, but finish must
errorOrDestroy(stream, er); // this can emit finish, but finish must
// always follow error

finishMaybe(stream, state);
Expand Down Expand Up @@ -587,7 +591,7 @@ function callFinal(stream, state) {
state.pendingcb--;

if (err) {
stream.emit('error', err);
errorOrDestroy(stream, err);
}

state.prefinished = true;
Expand Down Expand Up @@ -618,6 +622,16 @@ function finishMaybe(stream, state) {
if (state.pendingcb === 0) {
state.finished = true;
stream.emit('finish');

if (state.autoDestroy) {
// In case of duplex streams we need a way to detect
// if the readable side is ready for autoDestroy as well
var rState = stream._readableState;

if (!rState || rState.autoDestroy && rState.endEmitted) {
stream.destroy();
}
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion lib/internal/streams/buffer_list.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; }
function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { keys.push.apply(keys, Object.getOwnPropertySymbols(object)); } if (enumerableOnly) keys = keys.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); return keys; }

function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(source, true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(source).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; }

function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }

Expand Down
32 changes: 26 additions & 6 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ function destroy(err, cb) {
if (readableDestroyed || writableDestroyed) {
if (cb) {
cb(err);
} else if (err && (!this._writableState || !this._writableState.errorEmitted)) {
process.nextTick(emitErrorNT, this, err);
} else if (err) {
if (!this._writableState) {
process.nextTick(emitErrorNT, this, err);
} else if (!this._writableState.errorEmitted) {
this._writableState.errorEmitted = true;
process.nextTick(emitErrorNT, this, err);
}
}

return this;
Expand All @@ -29,10 +34,13 @@ function destroy(err, cb) {

this._destroy(err || null, function (err) {
if (!cb && err) {
process.nextTick(emitErrorAndCloseNT, _this, err);

if (_this._writableState) {
if (!_this._writableState) {
process.nextTick(emitErrorAndCloseNT, _this, err);
} else if (!_this._writableState.errorEmitted) {
_this._writableState.errorEmitted = true;
process.nextTick(emitErrorAndCloseNT, _this, err);
} else {
process.nextTick(emitCloseNT, _this);
}
} else if (cb) {
process.nextTick(emitCloseNT, _this);
Expand Down Expand Up @@ -79,7 +87,19 @@ function emitErrorNT(self, err) {
self.emit('error', err);
}

function errorOrDestroy(stream, err) {
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
// the error to be emitted nextTick. In a future
// semver major update we should change the default to this.
var rState = stream._readableState;
var wState = stream._writableState;
if (rState && rState.autoDestroy || wState && wState.autoDestroy) stream.destroy(err);else stream.emit('error', err);
}

module.exports = {
destroy: destroy,
undestroy: undestroy
undestroy: undestroy,
errorOrDestroy: errorOrDestroy
};
Loading