Skip to content

Commit eded1e9

Browse files
benjamingrronagatlowChemi
authored andcommitted
stream: support dispose in writable
Add support to Symbol.asyncDispose in writable streams. Additionally add a test for writable, transform and duplex streams who inherit from readable/writable to avoid breakage. Co-authored-by: Robert Nagy <ronagy@icloud.com> Co-authored-by: atlowChemi <chemi@atlow.co.il> PR-URL: #48547 Reviewed-By: Chemi Atlow <chemi@atlow.co.il> Reviewed-By: Moshe Atlow <moshe@atlow.co.il> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 5961253 commit eded1e9

File tree

5 files changed

+76
-10
lines changed

5 files changed

+76
-10
lines changed

doc/api/stream.md

+11
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,17 @@ added: v12.3.0
952952

953953
Getter for the property `objectMode` of a given `Writable` stream.
954954

955+
##### `writable[Symbol.asyncDispose]()`
956+
957+
<!-- YAML
958+
added: REPLACEME
959+
-->
960+
961+
> Stability: 1 - Experimental
962+
963+
Calls [`writable.destroy()`][writable-destroy] with an `AbortError` and returns
964+
a promise that fulfills when the stream is finished.
965+
955966
##### `writable.write(chunk[, encoding][, callback])`
956967

957968
<!-- YAML

lib/internal/streams/writable.js

+27-10
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ const {
3232
ObjectDefineProperties,
3333
ObjectDefineProperty,
3434
ObjectSetPrototypeOf,
35+
Promise,
3536
StringPrototypeToLowerCase,
3637
Symbol,
38+
SymbolAsyncDispose,
3739
SymbolHasInstance,
3840
} = primordials;
3941

@@ -44,6 +46,7 @@ const EE = require('events');
4446
const Stream = require('internal/streams/legacy').Stream;
4547
const { Buffer } = require('buffer');
4648
const destroyImpl = require('internal/streams/destroy');
49+
const eos = require('internal/streams/end-of-stream');
4750

4851
const {
4952
addAbortSignal,
@@ -54,16 +57,19 @@ const {
5457
getDefaultHighWaterMark,
5558
} = require('internal/streams/state');
5659
const {
57-
ERR_INVALID_ARG_TYPE,
58-
ERR_METHOD_NOT_IMPLEMENTED,
59-
ERR_MULTIPLE_CALLBACK,
60-
ERR_STREAM_CANNOT_PIPE,
61-
ERR_STREAM_DESTROYED,
62-
ERR_STREAM_ALREADY_FINISHED,
63-
ERR_STREAM_NULL_VALUES,
64-
ERR_STREAM_WRITE_AFTER_END,
65-
ERR_UNKNOWN_ENCODING,
66-
} = require('internal/errors').codes;
60+
AbortError,
61+
codes: {
62+
ERR_INVALID_ARG_TYPE,
63+
ERR_METHOD_NOT_IMPLEMENTED,
64+
ERR_MULTIPLE_CALLBACK,
65+
ERR_STREAM_ALREADY_FINISHED,
66+
ERR_STREAM_CANNOT_PIPE,
67+
ERR_STREAM_DESTROYED,
68+
ERR_STREAM_NULL_VALUES,
69+
ERR_STREAM_WRITE_AFTER_END,
70+
ERR_UNKNOWN_ENCODING,
71+
},
72+
} = require('internal/errors');
6773
const {
6874
kState,
6975
// bitfields
@@ -1142,3 +1148,14 @@ Writable.fromWeb = function(writableStream, options) {
11421148
Writable.toWeb = function(streamWritable) {
11431149
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
11441150
};
1151+
1152+
Writable.prototype[SymbolAsyncDispose] = function() {
1153+
let error;
1154+
if (!this.destroyed) {
1155+
error = this.writableFinished ? null : new AbortError();
1156+
this.destroy(error);
1157+
}
1158+
return new Promise((resolve, reject) =>
1159+
eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))),
1160+
);
1161+
};

test/parallel/test-stream-duplex-destroy.js

+15
Original file line numberDiff line numberDiff line change
@@ -269,3 +269,18 @@ const assert = require('assert');
269269
}));
270270
duplex.destroy();
271271
}
272+
273+
{
274+
// Check Symbol.asyncDispose
275+
const duplex = new Duplex({
276+
write(chunk, enc, cb) { cb(); },
277+
read() {},
278+
});
279+
let count = 0;
280+
duplex.on('error', common.mustCall((e) => {
281+
assert.strictEqual(count++, 0); // Ensure not called twice
282+
assert.strictEqual(e.name, 'AbortError');
283+
}));
284+
duplex.on('close', common.mustCall());
285+
duplex[Symbol.asyncDispose]().then(common.mustCall());
286+
}

test/parallel/test-stream-transform-destroy.js

+11
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,14 @@ const assert = require('assert');
141141

142142
transform.destroy();
143143
}
144+
145+
{
146+
const transform = new Transform({
147+
transform(chunk, enc, cb) {}
148+
});
149+
transform.on('error', common.mustCall((err) => {
150+
assert.strictEqual(err.name, 'AbortError');
151+
}));
152+
transform.on('close', common.mustCall());
153+
transform[Symbol.asyncDispose]().then(common.mustCall());
154+
}

test/parallel/test-stream-writable-destroy.js

+12
Original file line numberDiff line numberDiff line change
@@ -487,3 +487,15 @@ const assert = require('assert');
487487
}));
488488
s.destroy(_err);
489489
}
490+
491+
{
492+
const write = new Writable({
493+
write(chunk, enc, cb) { cb(); }
494+
});
495+
496+
write.on('error', common.mustCall((e) => {
497+
assert.strictEqual(e.name, 'AbortError');
498+
assert.strictEqual(write.destroyed, true);
499+
}));
500+
write[Symbol.asyncDispose]().then(common.mustCall());
501+
}

0 commit comments

Comments
 (0)