Skip to content

Commit d0a0071

Browse files
ronagcodebytere
authored andcommitted
stream: invoke buffered write callbacks on error
Refs: #30596 Buffered write callbacks were only invoked upon error if `autoDestroy` was invoked. Backport-PR-URL: #31179 PR-URL: #30596 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent f71fc90 commit d0a0071

File tree

2 files changed

+69
-6
lines changed

2 files changed

+69
-6
lines changed

lib/_stream_writable.js

+26-6
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,11 @@ function onwriteError(stream, state, er, cb) {
459459
--state.pendingcb;
460460

461461
cb(er);
462+
// Ensure callbacks are invoked even when autoDestroy is
463+
// not enabled. Passing `er` here doesn't make sense since
464+
// it's related to one specific write, not to the buffered
465+
// writes.
466+
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
462467
// This can emit error, but error must always follow cb.
463468
errorOrDestroy(stream, er);
464469
}
@@ -530,9 +535,29 @@ function afterWrite(stream, state, count, cb) {
530535
cb();
531536
}
532537

538+
if (state.destroyed) {
539+
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
540+
}
541+
533542
finishMaybe(stream, state);
534543
}
535544

545+
// If there's something in the buffer waiting, then invoke callbacks.
546+
function errorBuffer(state, err) {
547+
if (state.writing || !state.bufferedRequest) {
548+
return;
549+
}
550+
551+
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
552+
const len = state.objectMode ? 1 : entry.chunk.length;
553+
state.length -= len;
554+
entry.callback(err);
555+
}
556+
state.bufferedRequest = null;
557+
state.lastBufferedRequest = null;
558+
state.bufferedRequestCount = 0;
559+
}
560+
536561
// If there's something in the buffer waiting, then process it
537562
function clearBuffer(stream, state) {
538563
state.bufferProcessing = true;
@@ -782,12 +807,7 @@ const destroy = destroyImpl.destroy;
782807
Writable.prototype.destroy = function(err, cb) {
783808
const state = this._writableState;
784809
if (!state.destroyed) {
785-
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
786-
process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write'));
787-
}
788-
state.bufferedRequest = null;
789-
state.lastBufferedRequest = null;
790-
state.bufferedRequestCount = 0;
810+
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
791811
}
792812
destroy.call(this, err, cb);
793813
return this;

test/parallel/test-stream-writable-destroy.js

+43
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,46 @@ const assert = require('assert');
292292
}));
293293
write.uncork();
294294
}
295+
296+
{
297+
// Call buffered write callback with error
298+
299+
const write = new Writable({
300+
write(chunk, enc, cb) {
301+
process.nextTick(cb, new Error('asd'));
302+
},
303+
autoDestroy: false
304+
});
305+
write.cork();
306+
write.write('asd', common.mustCall((err) => {
307+
assert.strictEqual(err.message, 'asd');
308+
}));
309+
write.write('asd', common.mustCall((err) => {
310+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
311+
}));
312+
write.on('error', common.mustCall((err) => {
313+
assert.strictEqual(err.message, 'asd');
314+
}));
315+
write.uncork();
316+
}
317+
318+
{
319+
// Ensure callback order.
320+
321+
let state = 0;
322+
const write = new Writable({
323+
write(chunk, enc, cb) {
324+
// `setImmediate()` is used on purpose to ensure the callback is called
325+
// after `process.nextTick()` callbacks.
326+
setImmediate(cb);
327+
}
328+
});
329+
write.write('asd', common.mustCall(() => {
330+
assert.strictEqual(state++, 0);
331+
}));
332+
write.write('asd', common.mustCall((err) => {
333+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
334+
assert.strictEqual(state++, 1);
335+
}));
336+
write.destroy();
337+
}

0 commit comments

Comments
 (0)