Skip to content

Commit a15e712

Browse files
ronagtargos
authored andcommitted
fs: allow overriding fs for streams
Allow overriding open, write, and close when using createReadStream() and createWriteStream(). PR-URL: #29083 Refs: #29050 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 709d3e5 commit a15e712

File tree

4 files changed

+164
-41
lines changed

4 files changed

+164
-41
lines changed

doc/api/fs.md

+24-2
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,10 @@ changes:
16741674
- version: v2.3.0
16751675
pr-url: https://github.com/nodejs/node/pull/1845
16761676
description: The passed `options` object can be a string now.
1677+
- version: REPLACEME
1678+
pr-url: https://github.com/nodejs/node/pull/29083
1679+
description: The `fs` options allow overriding the used `fs`
1680+
implementation.
16771681
-->
16781682

16791683
* `path` {string|Buffer|URL}
@@ -1688,7 +1692,8 @@ changes:
16881692
* `start` {integer}
16891693
* `end` {integer} **Default:** `Infinity`
16901694
* `highWaterMark` {integer} **Default:** `64 * 1024`
1691-
* Returns: {fs.ReadStream}
1695+
* `fs` {Object|null} **Default:** `null`
1696+
* Returns: {fs.ReadStream} See [Readable Stream][].
16921697

16931698
Unlike the 16 kb default `highWaterMark` for a readable stream, the stream
16941699
returned by this method has a default `highWaterMark` of 64 kb.
@@ -1715,6 +1720,10 @@ By default, the stream will not emit a `'close'` event after it has been
17151720
destroyed. This is the opposite of the default for other `Readable` streams.
17161721
Set the `emitClose` option to `true` to change this behavior.
17171722

1723+
By providing the `fs` option it is possible to override the corresponding `fs`
1724+
implementations for `open`, `read` and `close`. When providing the `fs` option,
1725+
you must override `open`, `close` and `read`.
1726+
17181727
```js
17191728
const fs = require('fs');
17201729
// Create a stream from some character device.
@@ -1768,6 +1777,10 @@ changes:
17681777
- version: v2.3.0
17691778
pr-url: https://github.com/nodejs/node/pull/1845
17701779
description: The passed `options` object can be a string now.
1780+
- version: REPLACEME
1781+
pr-url: https://github.com/nodejs/node/pull/REPLACEME
1782+
description: The `fs` options allow overriding the used `fs`
1783+
implementation.
17711784
-->
17721785

17731786
* `path` {string|Buffer|URL}
@@ -1780,7 +1793,8 @@ changes:
17801793
* `autoClose` {boolean} **Default:** `true`
17811794
* `emitClose` {boolean} **Default:** `false`
17821795
* `start` {integer}
1783-
* Returns: {fs.WriteStream}
1796+
* `fs` {Object|null} **Default:** `null`
1797+
* Returns: {fs.WriteStream} See [Writable Stream][].
17841798

17851799
`options` may also include a `start` option to allow writing data at
17861800
some position past the beginning of the file, allowed values are in the
@@ -1799,6 +1813,12 @@ By default, the stream will not emit a `'close'` event after it has been
17991813
destroyed. This is the opposite of the default for other `Writable` streams.
18001814
Set the `emitClose` option to `true` to change this behavior.
18011815

1816+
By providing the `fs` option it is possible to override the corresponding `fs`
1817+
implementations for `open`, `write`, `writev` and `close`. Overriding `write()`
1818+
without `writev()` can reduce performance as some optimizations (`_writev()`)
1819+
will be disabled. When providing the `fs` option, you must override `open`,
1820+
`close` and at least one of `write` and `writev`.
1821+
18021822
Like [`ReadStream`][], if `fd` is specified, [`WriteStream`][] will ignore the
18031823
`path` argument and will use the specified file descriptor. This means that no
18041824
`'open'` event will be emitted. `fd` should be blocking; non-blocking `fd`s
@@ -5528,6 +5548,7 @@ the file contents.
55285548
[`Number.MAX_SAFE_INTEGER`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
55295549
[`ReadDirectoryChangesW`]: https://docs.microsoft.com/en-us/windows/desktop/api/winbase/nf-winbase-readdirectorychangesw
55305550
[`ReadStream`]: #fs_class_fs_readstream
5551+
[Readable Stream]: #stream_class_stream_readable
55315552
[`URL`]: url.html#url_the_whatwg_url_api
55325553
[`UV_THREADPOOL_SIZE`]: cli.html#cli_uv_threadpool_size_size
55335554
[`WriteStream`]: #fs_class_fs_writestream
@@ -5587,3 +5608,4 @@ the file contents.
55875608
[chcp]: https://ss64.com/nt/chcp.html
55885609
[inode]: https://en.wikipedia.org/wiki/Inode
55895610
[support of file system `flags`]: #fs_file_system_flags
5611+
[Writable Stream]: #stream_class_stream_writable

lib/internal/fs/streams.js

+91-37
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const {
1111
} = primordials;
1212

1313
const {
14+
ERR_INVALID_ARG_TYPE,
1415
ERR_OUT_OF_RANGE,
1516
ERR_STREAM_DESTROYED
1617
} = require('internal/errors').codes;
@@ -27,6 +28,7 @@ const kIoDone = Symbol('kIoDone');
2728
const kIsPerformingIO = Symbol('kIsPerformingIO');
2829

2930
const kMinPoolSpace = 128;
31+
const kFs = Symbol('kFs');
3032

3133
let pool;
3234
// It can happen that we expect to read a large chunk of data, and reserve
@@ -75,6 +77,23 @@ function ReadStream(path, options) {
7577
options.emitClose = false;
7678
}
7779

80+
this[kFs] = options.fs || fs;
81+
82+
if (typeof this[kFs].open !== 'function') {
83+
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
84+
this[kFs].open);
85+
}
86+
87+
if (typeof this[kFs].read !== 'function') {
88+
throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
89+
this[kFs].read);
90+
}
91+
92+
if (typeof this[kFs].close !== 'function') {
93+
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
94+
this[kFs].close);
95+
}
96+
7897
Readable.call(this, options);
7998

8099
// Path will be ignored when fd is specified, so it can be falsy
@@ -124,7 +143,7 @@ ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
124143
ObjectSetPrototypeOf(ReadStream, Readable);
125144

126145
ReadStream.prototype.open = function() {
127-
fs.open(this.path, this.flags, this.mode, (er, fd) => {
146+
this[kFs].open(this.path, this.flags, this.mode, (er, fd) => {
128147
if (er) {
129148
if (this.autoClose) {
130149
this.destroy();
@@ -174,42 +193,43 @@ ReadStream.prototype._read = function(n) {
174193

175194
// the actual read.
176195
this[kIsPerformingIO] = true;
177-
fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
178-
this[kIsPerformingIO] = false;
179-
// Tell ._destroy() that it's safe to close the fd now.
180-
if (this.destroyed) return this.emit(kIoDone, er);
181-
182-
if (er) {
183-
if (this.autoClose) {
184-
this.destroy();
185-
}
186-
this.emit('error', er);
187-
} else {
188-
let b = null;
189-
// Now that we know how much data we have actually read, re-wind the
190-
// 'used' field if we can, and otherwise allow the remainder of our
191-
// reservation to be used as a new pool later.
192-
if (start + toRead === thisPool.used && thisPool === pool) {
193-
const newUsed = thisPool.used + bytesRead - toRead;
194-
thisPool.used = roundUpToMultipleOf8(newUsed);
196+
this[kFs].read(
197+
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
198+
this[kIsPerformingIO] = false;
199+
// Tell ._destroy() that it's safe to close the fd now.
200+
if (this.destroyed) return this.emit(kIoDone, er);
201+
202+
if (er) {
203+
if (this.autoClose) {
204+
this.destroy();
205+
}
206+
this.emit('error', er);
195207
} else {
196-
// Round down to the next lowest multiple of 8 to ensure the new pool
197-
// fragment start and end positions are aligned to an 8 byte boundary.
198-
const alignedEnd = (start + toRead) & ~7;
199-
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
200-
if (alignedEnd - alignedStart >= kMinPoolSpace) {
201-
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
208+
let b = null;
209+
// Now that we know how much data we have actually read, re-wind the
210+
// 'used' field if we can, and otherwise allow the remainder of our
211+
// reservation to be used as a new pool later.
212+
if (start + toRead === thisPool.used && thisPool === pool) {
213+
const newUsed = thisPool.used + bytesRead - toRead;
214+
thisPool.used = roundUpToMultipleOf8(newUsed);
215+
} else {
216+
// Round down to the next lowest multiple of 8 to ensure the new pool
217+
// fragment start and end positions are aligned to an 8 byte boundary.
218+
const alignedEnd = (start + toRead) & ~7;
219+
const alignedStart = roundUpToMultipleOf8(start + bytesRead);
220+
if (alignedEnd - alignedStart >= kMinPoolSpace) {
221+
poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
222+
}
202223
}
203-
}
204224

205-
if (bytesRead > 0) {
206-
this.bytesRead += bytesRead;
207-
b = thisPool.slice(start, start + bytesRead);
208-
}
225+
if (bytesRead > 0) {
226+
this.bytesRead += bytesRead;
227+
b = thisPool.slice(start, start + bytesRead);
228+
}
209229

210-
this.push(b);
211-
}
212-
});
230+
this.push(b);
231+
}
232+
});
213233

214234
// Move the pool positions, and internal position for reading.
215235
if (this.pos !== undefined)
@@ -233,7 +253,7 @@ ReadStream.prototype._destroy = function(err, cb) {
233253
};
234254

235255
function closeFsStream(stream, cb, err) {
236-
fs.close(stream.fd, (er) => {
256+
stream[kFs].close(stream.fd, (er) => {
237257
er = er || err;
238258
cb(er);
239259
stream.closed = true;
@@ -268,6 +288,40 @@ function WriteStream(path, options) {
268288
options.emitClose = false;
269289
}
270290

291+
this[kFs] = options.fs || fs;
292+
if (typeof this[kFs].open !== 'function') {
293+
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
294+
this[kFs].open);
295+
}
296+
297+
if (!this[kFs].write && !this[kFs].writev) {
298+
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
299+
this[kFs].write);
300+
}
301+
302+
if (this[kFs].write && typeof this[kFs].write !== 'function') {
303+
throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
304+
this[kFs].write);
305+
}
306+
307+
if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
308+
throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
309+
this[kFs].writev);
310+
}
311+
312+
if (typeof this[kFs].close !== 'function') {
313+
throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
314+
this[kFs].close);
315+
}
316+
317+
// It's enough to override either, in which case only one will be used.
318+
if (!this[kFs].write) {
319+
this._write = null;
320+
}
321+
if (!this[kFs].writev) {
322+
this._writev = null;
323+
}
324+
271325
Writable.call(this, options);
272326

273327
// Path will be ignored when fd is specified, so it can be falsy
@@ -313,7 +367,7 @@ WriteStream.prototype._final = function(callback) {
313367
};
314368

315369
WriteStream.prototype.open = function() {
316-
fs.open(this.path, this.flags, this.mode, (er, fd) => {
370+
this[kFs].open(this.path, this.flags, this.mode, (er, fd) => {
317371
if (er) {
318372
if (this.autoClose) {
319373
this.destroy();
@@ -339,7 +393,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
339393
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
340394

341395
this[kIsPerformingIO] = true;
342-
fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
396+
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
343397
this[kIsPerformingIO] = false;
344398
// Tell ._destroy() that it's safe to close the fd now.
345399
if (this.destroyed) {
@@ -383,7 +437,7 @@ WriteStream.prototype._writev = function(data, cb) {
383437
}
384438

385439
this[kIsPerformingIO] = true;
386-
fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
440+
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
387441
this[kIsPerformingIO] = false;
388442
// Tell ._destroy() that it's safe to close the fd now.
389443
if (this.destroyed) {

test/parallel/test-fs-read-stream.js

+11-2
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ const fixtures = require('../common/fixtures');
3131
const fn = fixtures.path('elipses.txt');
3232
const rangeFile = fixtures.path('x.txt');
3333

34-
{
34+
function test1(options) {
3535
let paused = false;
3636
let bytesRead = 0;
3737

38-
const file = fs.createReadStream(fn);
38+
const file = fs.createReadStream(fn, options);
3939
const fileSize = fs.statSync(fn).size;
4040

4141
assert.strictEqual(file.bytesRead, 0);
@@ -88,6 +88,15 @@ const rangeFile = fixtures.path('x.txt');
8888
});
8989
}
9090

91+
test1({});
92+
test1({
93+
fs: {
94+
open: common.mustCall(fs.open),
95+
read: common.mustCallAtLeast(fs.read, 1),
96+
close: common.mustCall(fs.close),
97+
}
98+
});
99+
91100
{
92101
const file = fs.createReadStream(fn, { encoding: 'utf8' });
93102
file.length = 0;
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
const common = require('../common');
3+
const path = require('path');
4+
const fs = require('fs');
5+
6+
const tmpdir = require('../common/tmpdir');
7+
tmpdir.refresh();
8+
9+
{
10+
const file = path.join(tmpdir.path, 'write-end-test0.txt');
11+
const stream = fs.createWriteStream(file, {
12+
fs: {
13+
open: common.mustCall(fs.open),
14+
write: common.mustCallAtLeast(fs.write, 1),
15+
close: common.mustCall(fs.close),
16+
}
17+
});
18+
stream.end('asd');
19+
stream.on('close', common.mustCall());
20+
}
21+
22+
23+
{
24+
const file = path.join(tmpdir.path, 'write-end-test1.txt');
25+
const stream = fs.createWriteStream(file, {
26+
fs: {
27+
open: common.mustCall(fs.open),
28+
write: fs.write,
29+
writev: common.mustCallAtLeast(fs.writev, 1),
30+
close: common.mustCall(fs.close),
31+
}
32+
});
33+
stream.write('asd');
34+
stream.write('asd');
35+
stream.write('asd');
36+
stream.end();
37+
stream.on('close', common.mustCall());
38+
}

0 commit comments

Comments
 (0)