Skip to content

Commit 03b85ec

Browse files
committed
stream: invoke buffered write callbacks on error
Buffered write callbacks were only invoked upon error if `autoDestroy` was invoked. Backport-PR-URL: nodejs#31179 PR-URL: nodejs#30596 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent ca23511 commit 03b85ec

File tree

2 files changed

+121
-6
lines changed

2 files changed

+121
-6
lines changed

lib/_stream_writable.js

+26-6
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,11 @@ function onwriteError(stream, state, er, cb) {
458458
--state.pendingcb;
459459

460460
cb(er);
461+
// Ensure callbacks are invoked even when autoDestroy is
462+
// not enabled. Passing `er` here doesn't make sense since
463+
// it's related to one specific write, not to the buffered
464+
// writes.
465+
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
461466
// This can emit error, but error must always follow cb.
462467
errorOrDestroy(stream, er);
463468
}
@@ -529,9 +534,29 @@ function afterWrite(stream, state, count, cb) {
529534
cb();
530535
}
531536

537+
if (state.destroyed) {
538+
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
539+
}
540+
532541
finishMaybe(stream, state);
533542
}
534543

544+
// If there's something in the buffer waiting, then invoke callbacks.
545+
function errorBuffer(state, err) {
546+
if (state.writing || !state.bufferedRequest) {
547+
return;
548+
}
549+
550+
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
551+
const len = state.objectMode ? 1 : entry.chunk.length;
552+
state.length -= len;
553+
entry.callback(err);
554+
}
555+
state.bufferedRequest = null;
556+
state.lastBufferedRequest = null;
557+
state.bufferedRequestCount = 0;
558+
}
559+
535560
// If there's something in the buffer waiting, then process it
536561
function clearBuffer(stream, state) {
537562
state.bufferProcessing = true;
@@ -781,12 +806,7 @@ const destroy = destroyImpl.destroy;
781806
Writable.prototype.destroy = function(err, cb) {
782807
const state = this._writableState;
783808
if (!state.destroyed) {
784-
for (let entry = state.bufferedRequest; entry; entry = entry.next) {
785-
process.nextTick(entry.callback, new ERR_STREAM_DESTROYED('write'));
786-
}
787-
state.bufferedRequest = null;
788-
state.lastBufferedRequest = null;
789-
state.bufferedRequestCount = 0;
809+
process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
790810
}
791811
destroy.call(this, err, cb);
792812
return this;

test/parallel/test-stream-writable-destroy.js

+95
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,98 @@ const assert = require('assert');
292292
}));
293293
write.uncork();
294294
}
295+
296+
{
297+
// Call end(cb) after error & destroy
298+
299+
const write = new Writable({
300+
write(chunk, enc, cb) { cb(new Error('asd')); }
301+
});
302+
write.on('error', common.mustCall(() => {
303+
write.destroy();
304+
let ticked = false;
305+
write.end(common.mustCall((err) => {
306+
assert.strictEqual(ticked, true);
307+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
308+
}));
309+
ticked = true;
310+
}));
311+
write.write('asd');
312+
}
313+
314+
{
315+
// Call end(cb) after finish & destroy
316+
317+
const write = new Writable({
318+
write(chunk, enc, cb) { cb(); }
319+
});
320+
write.on('finish', common.mustCall(() => {
321+
write.destroy();
322+
let ticked = false;
323+
write.end(common.mustCall((err) => {
324+
assert.strictEqual(ticked, false);
325+
assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
326+
}));
327+
ticked = true;
328+
}));
329+
write.end();
330+
}
331+
332+
{
333+
// Call end(cb) after error & destroy and don't trigger
334+
// unhandled exception.
335+
336+
const write = new Writable({
337+
write(chunk, enc, cb) { process.nextTick(cb); }
338+
});
339+
write.once('error', common.mustCall((err) => {
340+
assert.strictEqual(err.message, 'asd');
341+
}));
342+
write.end('asd', common.mustCall((err) => {
343+
assert.strictEqual(err.message, 'asd');
344+
}));
345+
write.destroy(new Error('asd'));
346+
}
347+
348+
{
349+
// Call buffered write callback with error
350+
351+
const write = new Writable({
352+
write(chunk, enc, cb) {
353+
process.nextTick(cb, new Error('asd'));
354+
},
355+
autoDestroy: false
356+
});
357+
write.cork();
358+
write.write('asd', common.mustCall((err) => {
359+
assert.strictEqual(err.message, 'asd');
360+
}));
361+
write.write('asd', common.mustCall((err) => {
362+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
363+
}));
364+
write.on('error', common.mustCall((err) => {
365+
assert.strictEqual(err.message, 'asd');
366+
}));
367+
write.uncork();
368+
}
369+
370+
{
371+
// Ensure callback order.
372+
373+
let state = 0;
374+
const write = new Writable({
375+
write(chunk, enc, cb) {
376+
// `setImmediate()` is used on purpose to ensure the callback is called
377+
// after `process.nextTick()` callbacks.
378+
setImmediate(cb);
379+
}
380+
});
381+
write.write('asd', common.mustCall(() => {
382+
assert.strictEqual(state++, 0);
383+
}));
384+
write.write('asd', common.mustCall((err) => {
385+
assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
386+
assert.strictEqual(state++, 1);
387+
}));
388+
write.destroy();
389+
}

0 commit comments

Comments
 (0)