Skip to content

Commit fb8cc72

Browse files
committed
stream: construct
Provide a standardized way of asynchronously creating and initializing resources before performing any work. Refs: nodejs#29314 PR-URL: nodejs#29656 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
1 parent 9949a2e commit fb8cc72

File tree

5 files changed

+544
-20
lines changed

5 files changed

+544
-20
lines changed

doc/api/stream.md

+164-7
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,7 @@ added: v9.3.0
550550

551551
* {number}
552552

553-
Return the value of `highWaterMark` passed when constructing this
554-
`Writable`.
553+
Return the value of `highWaterMark` passed when creating this `Writable`.
555554

556555
##### `writable.writableLength`
557556
<!-- YAML
@@ -1193,8 +1192,7 @@ added: v9.3.0
11931192

11941193
* {number}
11951194

1196-
Returns the value of `highWaterMark` passed when constructing this
1197-
`Readable`.
1195+
Returns the value of `highWaterMark` passed when creating this `Readable`.
11981196

11991197
##### `readable.readableLength`
12001198
<!-- YAML
@@ -1792,7 +1790,7 @@ expectations.
17921790
added: v1.2.0
17931791
-->
17941792

1795-
For many simple cases, it is possible to construct a stream without relying on
1793+
For many simple cases, it is possible to create a stream without relying on
17961794
inheritance. This can be accomplished by directly creating instances of the
17971795
`stream.Writable`, `stream.Readable`, `stream.Duplex` or `stream.Transform`
17981796
objects and passing appropriate methods as constructor options.
@@ -1801,8 +1799,14 @@ objects and passing appropriate methods as constructor options.
18011799
const { Writable } = require('stream');
18021800

18031801
const myWritable = new Writable({
1802+
construct(callback) {
1803+
// Initialize state and load resources...
1804+
},
18041805
write(chunk, encoding, callback) {
18051806
// ...
1807+
},
1808+
destroy() {
1809+
// Free resources...
18061810
}
18071811
});
18081812
```
@@ -1861,6 +1865,8 @@ changes:
18611865
[`stream._destroy()`][writable-_destroy] method.
18621866
* `final` {Function} Implementation for the
18631867
[`stream._final()`][stream-_final] method.
1868+
* `construct` {Function} Implementation for the
1869+
[`stream._construct()`][writable-_construct] method.
18641870
* `autoDestroy` {boolean} Whether this stream should automatically call
18651871
`.destroy()` on itself after ending. **Default:** `true`.
18661872

@@ -1906,6 +1912,56 @@ const myWritable = new Writable({
19061912
});
19071913
```
19081914

1915+
#### `writable._construct(callback)`
1916+
<!-- YAML
1917+
added: REPLACEME
1918+
-->
1919+
1920+
* `callback` {Function} Call this function (optionally with an error
1921+
argument) when the stream has finished initializing.
1922+
1923+
The `_construct()` method MUST NOT be called directly. It may be implemented
1924+
by child classes, and if so, will be called by the internal `Writable`
1925+
class methods only.
1926+
1927+
This optional function will be called in a tick after the stream constructor
1928+
has returned, delaying any `_write`, `_final` and `_destroy` calls until
1929+
`callback` is called. This is useful to initialize state or asynchronously
1930+
initialize resources before the stream can be used.
1931+
1932+
```js
1933+
const { Writable } = require('stream');
1934+
const fs = require('fs');
1935+
1936+
class WriteStream extends Writable {
1937+
constructor(filename) {
1938+
super();
1939+
this.filename = filename;
1940+
this.fd = fd;
1941+
}
1942+
_construct(callback) {
1943+
fs.open(this.filename, (fd, err) => {
1944+
if (err) {
1945+
callback(err);
1946+
} else {
1947+
this.fd = fd;
1948+
callback();
1949+
}
1950+
});
1951+
}
1952+
_write(chunk, encoding, callback) {
1953+
fs.write(this.fd, chunk, callback);
1954+
}
1955+
_destroy(err, callback) {
1956+
if (this.fd) {
1957+
fs.close(this.fd, (er) => callback(er || err));
1958+
} else {
1959+
callback(err);
1960+
}
1961+
}
1962+
}
1963+
```
1964+
19091965
#### `writable._write(chunk, encoding, callback)`
19101966
<!-- YAML
19111967
changes:
@@ -2130,6 +2186,8 @@ changes:
21302186
method.
21312187
* `destroy` {Function} Implementation for the
21322188
[`stream._destroy()`][readable-_destroy] method.
2189+
* `construct` {Function} Implementation for the
2190+
[`stream._construct()`][readable-_construct] method.
21332191
* `autoDestroy` {boolean} Whether this stream should automatically call
21342192
`.destroy()` on itself after ending. **Default:** `true`.
21352193

@@ -2172,6 +2230,63 @@ const myReadable = new Readable({
21722230
});
21732231
```
21742232

2233+
#### `readable._construct(callback)`
2234+
<!-- YAML
2235+
added: REPLACEME
2236+
-->
2237+
2238+
* `callback` {Function} Call this function (optionally with an error
2239+
argument) when the stream has finished initializing.
2240+
2241+
The `_construct()` method MUST NOT be called directly. It may be implemented
2242+
by child classes, and if so, will be called by the internal `Readable`
2243+
class methods only.
2244+
2245+
This optional function will be called by the stream constructor,
2246+
delaying any `_read` and `_destroy` calls until `callback` is called. This is
2247+
useful to initialize state or asynchronously initialize resources before the
2248+
stream can be used.
2249+
2250+
```js
2251+
const { Readable } = require('stream');
2252+
const fs = require('fs');
2253+
2254+
class ReadStream extends Readable {
2255+
constructor(filename) {
2256+
super();
2257+
this.filename = filename;
2258+
this.fd = null;
2259+
}
2260+
_construct(callback) {
2261+
fs.open(this.filename, (fd, err) => {
2262+
if (err) {
2263+
callback(err);
2264+
} else {
2265+
this.fd = fd;
2266+
callback();
2267+
}
2268+
});
2269+
}
2270+
_read(n) {
2271+
const buf = Buffer.alloc(n);
2272+
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
2273+
if (err) {
2274+
this.destroy(err);
2275+
} else {
2276+
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
2277+
}
2278+
});
2279+
}
2280+
_destroy(err, callback) {
2281+
if (this.fd) {
2282+
fs.close(this.fd, (er) => callback(er || err));
2283+
} else {
2284+
callback(err);
2285+
}
2286+
}
2287+
}
2288+
```
2289+
21752290
#### `readable._read(size)`
21762291
<!-- YAML
21772292
added: v0.9.4
@@ -2427,6 +2542,46 @@ const myDuplex = new Duplex({
24272542
});
24282543
```
24292544

2545+
When using pipeline:
2546+
2547+
```js
2548+
const { Transform, pipeline } = require('stream');
2549+
const fs = require('fs');
2550+
2551+
pipeline(
2552+
fs.createReadStream('object.json')
2553+
.setEncoding('utf-8'),
2554+
new Transform({
2555+
decodeStrings: false, // Accept string input rather than Buffers
2556+
construct(callback) {
2557+
this.data = '';
2558+
callback();
2559+
},
2560+
transform(chunk, encoding, callback) {
2561+
this.data += chunk;
2562+
callback();
2563+
},
2564+
flush(callback) {
2565+
try {
2566+
// Make sure is valid json.
2567+
JSON.parse(this.data);
2568+
this.push(this.data);
2569+
} catch (err) {
2570+
callback(err);
2571+
}
2572+
}
2573+
}),
2574+
fs.createWriteStream('valid-object.json'),
2575+
(err) => {
2576+
if (err) {
2577+
console.error('failed', err);
2578+
} else {
2579+
console.log('completed');
2580+
}
2581+
}
2582+
);
2583+
```
2584+
24302585
#### An Example Duplex Stream
24312586

24322587
The following illustrates a simple example of a `Duplex` stream that wraps a
@@ -2706,8 +2861,8 @@ unhandled post-destroy errors.
27062861

27072862
#### Creating Readable Streams with Async Generators
27082863

2709-
We can construct a Node.js Readable Stream from an asynchronous generator
2710-
using the `Readable.from()` utility method:
2864+
A Node.js Readable Stream can be created from an asynchronous generator using
2865+
the `Readable.from()` utility method:
27112866

27122867
```js
27132868
const { Readable } = require('stream');
@@ -2960,6 +3115,7 @@ contain multi-byte characters.
29603115
[http-incoming-message]: http.html#http_class_http_incomingmessage
29613116
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
29623117
[object-mode]: #stream_object_mode
3118+
[readable-_construct]: #stream_readable_construct_callback
29633119
[readable-_destroy]: #stream_readable_destroy_err_callback
29643120
[readable-destroy]: #stream_readable_destroy_error
29653121
[stream-_final]: #stream_writable_final_callback
@@ -2976,6 +3132,7 @@ contain multi-byte characters.
29763132
[stream-uncork]: #stream_writable_uncork
29773133
[stream-write]: #stream_writable_write_chunk_encoding_callback
29783134
[Stream Three States]: #stream_three_states
3135+
[writable-_construct]: #stream_writable_construct_callback
29793136
[writable-_destroy]: #stream_writable_destroy_err_callback
29803137
[writable-destroy]: #stream_writable_destroy_error
29813138
[writable-new]: #stream_constructor_new_stream_writable_options

lib/_stream_readable.js

+19-5
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ function ReadableState(options, stream, isDuplex) {
118118
this.endEmitted = false;
119119
this.reading = false;
120120

121+
// Stream is still being constructed and cannot be
122+
// destroyed until construction finished or failed.
123+
// Async construction is opt in, therefore we start as
124+
// constructed.
125+
this.constructed = true;
126+
121127
// A flag to be able to tell if the event 'readable'/'data' is emitted
122128
// immediately, or on a later tick. We set this to true at first, because
123129
// any actions that shouldn't happen until "later" should generally also
@@ -197,9 +203,16 @@ function Readable(options) {
197203

198204
if (typeof options.destroy === 'function')
199205
this._destroy = options.destroy;
206+
207+
if (typeof options.construct === 'function')
208+
this._construct = options.construct;
200209
}
201210

202211
Stream.call(this, options);
212+
213+
destroyImpl.construct(this, () => {
214+
maybeReadMore(this, this._readableState);
215+
});
203216
}
204217

205218
Readable.prototype.destroy = destroyImpl.destroy;
@@ -461,11 +474,12 @@ Readable.prototype.read = function(n) {
461474
}
462475

463476
// However, if we've ended, then there's no point, if we're already
464-
// reading, then it's unnecessary, and if we're destroyed or errored,
465-
// then it's not allowed.
466-
if (state.ended || state.reading || state.destroyed || state.errored) {
477+
// reading, then it's unnecessary, if we're constructing we have to wait,
478+
// and if we're destroyed or errored, then it's not allowed,
479+
if (state.ended || state.reading || state.destroyed || state.errored ||
480+
!state.constructed) {
467481
doRead = false;
468-
debug('reading or ended', doRead);
482+
debug('reading, ended or constructing', doRead);
469483
} else if (doRead) {
470484
debug('do read');
471485
state.reading = true;
@@ -587,7 +601,7 @@ function emitReadable_(stream) {
587601
// However, if we're not ended, or reading, and the length < hwm,
588602
// then go ahead and try to read some more preemptively.
589603
function maybeReadMore(stream, state) {
590-
if (!state.readingMore) {
604+
if (!state.readingMore && state.constructed) {
591605
state.readingMore = true;
592606
process.nextTick(maybeReadMore_, stream, state);
593607
}

lib/_stream_writable.js

+25-2
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ function WritableState(options, stream, isDuplex) {
155155
// this must be 0 before 'finish' can be emitted.
156156
this.pendingcb = 0;
157157

158+
// Stream is still being constructed and cannot be
159+
// destroyed until construction finished or failed.
160+
// Async construction is opt in, therefore we start as
161+
// constructed.
162+
this.constructed = true;
163+
158164
// Emit prefinish if the only thing we're waiting for is _write cbs
159165
// This is relevant for synchronous Transform streams.
160166
this.prefinished = false;
@@ -249,9 +255,22 @@ function Writable(options) {
249255

250256
if (typeof options.final === 'function')
251257
this._final = options.final;
258+
259+
if (typeof options.construct === 'function')
260+
this._construct = options.construct;
252261
}
253262

254263
Stream.call(this, options);
264+
265+
destroyImpl.construct(this, () => {
266+
const state = this._writableState;
267+
268+
if (!state.writing) {
269+
clearBuffer(this, state);
270+
}
271+
272+
finishMaybe(this, state);
273+
});
255274
}
256275

257276
// Otherwise people can pipe Writable streams, which is just wrong.
@@ -342,7 +361,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
342361

343362
state.length += len;
344363

345-
if (state.writing || state.corked || state.errored) {
364+
if (state.writing || state.corked || state.errored || !state.constructed) {
346365
state.buffered.push({ chunk, encoding, callback });
347366
if (state.allBuffers && encoding !== 'buffer') {
348367
state.allBuffers = false;
@@ -492,7 +511,10 @@ function errorBuffer(state, err) {
492511

493512
// If there's something in the buffer waiting, then process it.
494513
function clearBuffer(stream, state) {
495-
if (state.corked || state.bufferProcessing || state.destroyed) {
514+
if (state.corked ||
515+
state.bufferProcessing ||
516+
state.destroyed ||
517+
!state.constructed) {
496518
return;
497519
}
498520

@@ -600,6 +622,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
600622

601623
function needFinish(state) {
602624
return (state.ending &&
625+
state.constructed &&
603626
state.length === 0 &&
604627
!state.errored &&
605628
state.buffered.length === 0 &&

0 commit comments

Comments
 (0)