Skip to content

Commit 6033d30

Browse files
mmomtchevdanielleadams
authored andcommitted
stream: add FileHandle support to Read/WriteStream
Support creating a Read/WriteStream from a FileHandle instead of a raw file descriptor Add an EventEmitter to FileHandle with a single 'close' event. Fixes: #35240 PR-URL: #35922 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 754b7a7 commit 6033d30

8 files changed

+285
-41
lines changed

doc/api/fs.md

+18-2
Original file line numberDiff line numberDiff line change
@@ -1751,6 +1751,10 @@ fs.copyFileSync('source.txt', 'destination.txt', COPYFILE_EXCL);
17511751
<!-- YAML
17521752
added: v0.1.31
17531753
changes:
1754+
- version:
1755+
- REPLACEME
1756+
pr-url: https://github.com/nodejs/node/pull/35922
1757+
description: The `fd` option accepts FileHandle arguments.
17541758
- version:
17551759
- v13.6.0
17561760
- v12.17.0
@@ -1782,7 +1786,7 @@ changes:
17821786
* `flags` {string} See [support of file system `flags`][]. **Default:**
17831787
`'r'`.
17841788
* `encoding` {string} **Default:** `null`
1785-
* `fd` {integer} **Default:** `null`
1789+
* `fd` {integer|FileHandle} **Default:** `null`
17861790
* `mode` {integer} **Default:** `0o666`
17871791
* `autoClose` {boolean} **Default:** `true`
17881792
* `emitClose` {boolean} **Default:** `false`
@@ -1858,6 +1862,10 @@ If `options` is a string, then it specifies the encoding.
18581862
<!-- YAML
18591863
added: v0.1.31
18601864
changes:
1865+
- version:
1866+
- REPLACEME
1867+
pr-url: https://github.com/nodejs/node/pull/35922
1868+
description: The `fd` option accepts FileHandle arguments.
18611869
- version:
18621870
- v13.6.0
18631871
- v12.17.0
@@ -1887,7 +1895,7 @@ changes:
18871895
* `flags` {string} See [support of file system `flags`][]. **Default:**
18881896
`'w'`.
18891897
* `encoding` {string} **Default:** `'utf8'`
1890-
* `fd` {integer} **Default:** `null`
1898+
* `fd` {integer|FileHandle} **Default:** `null`
18911899
* `mode` {integer} **Default:** `0o666`
18921900
* `autoClose` {boolean} **Default:** `true`
18931901
* `emitClose` {boolean} **Default:** `false`
@@ -4697,6 +4705,14 @@ the promise-based API uses the `FileHandle` class in order to help avoid
46974705
accidental leaking of unclosed file descriptors after a `Promise` is resolved or
46984706
rejected.
46994707

4708+
#### Event: `'close'`
4709+
<!-- YAML
4710+
added: REPLACEME
4711+
-->
4712+
4713+
The `'close'` event is emitted when the `FileHandle` and any of its underlying
4714+
resources (a file descriptor, for example) have been closed.
4715+
47004716
#### `filehandle.appendFile(data, options)`
47014717
<!-- YAML
47024718
added: v10.0.0

lib/internal/event_target.js

+17
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ const {
44
ArrayFrom,
55
Boolean,
66
Error,
7+
FunctionPrototypeCall,
78
NumberIsInteger,
89
ObjectAssign,
910
ObjectDefineProperties,
1011
ObjectDefineProperty,
1112
ObjectGetOwnPropertyDescriptor,
13+
ObjectGetOwnPropertyDescriptors,
1214
ReflectApply,
1315
SafeMap,
1416
String,
@@ -646,8 +648,23 @@ function defineEventHandler(emitter, name) {
646648
enumerable: true
647649
});
648650
}
651+
652+
const EventEmitterMixin = (Superclass) => {
653+
class MixedEventEmitter extends Superclass {
654+
constructor(...args) {
655+
super(...args);
656+
FunctionPrototypeCall(EventEmitter, this);
657+
}
658+
}
659+
const protoProps = ObjectGetOwnPropertyDescriptors(EventEmitter.prototype);
660+
delete protoProps.constructor;
661+
ObjectDefineProperties(MixedEventEmitter.prototype, protoProps);
662+
return MixedEventEmitter;
663+
};
664+
649665
module.exports = {
650666
Event,
667+
EventEmitterMixin,
651668
EventTarget,
652669
NodeEventTarget,
653670
defineEventHandler,

lib/internal/fs/promises.js

+26-12
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,16 @@ const {
7171
} = require('internal/validators');
7272
const pathModule = require('path');
7373
const { promisify } = require('internal/util');
74+
const { EventEmitterMixin } = require('internal/event_target');
7475

7576
const kHandle = Symbol('kHandle');
7677
const kFd = Symbol('kFd');
7778
const kRefs = Symbol('kRefs');
7879
const kClosePromise = Symbol('kClosePromise');
7980
const kCloseResolve = Symbol('kCloseResolve');
8081
const kCloseReject = Symbol('kCloseReject');
82+
const kRef = Symbol('kRef');
83+
const kUnref = Symbol('kUnref');
8184

8285
const { kUsePromises } = binding;
8386
const {
@@ -94,7 +97,7 @@ const lazyDOMException = hideStackFrames((message, name) => {
9497
return new DOMException(message, name);
9598
});
9699

97-
class FileHandle extends JSTransferable {
100+
class FileHandle extends EventEmitterMixin(JSTransferable) {
98101
constructor(filehandle) {
99102
super();
100103
this[kHandle] = filehandle;
@@ -197,6 +200,7 @@ class FileHandle extends JSTransferable {
197200
);
198201
}
199202

203+
this.emit('close');
200204
return this[kClosePromise];
201205
}
202206

@@ -226,6 +230,22 @@ class FileHandle extends JSTransferable {
226230
this[kHandle] = handle;
227231
this[kFd] = handle.fd;
228232
}
233+
234+
[kRef]() {
235+
this[kRefs]++;
236+
}
237+
238+
[kUnref]() {
239+
this[kRefs]--;
240+
if (this[kRefs] === 0) {
241+
this[kFd] = -1;
242+
PromisePrototypeThen(
243+
this[kHandle].close(),
244+
this[kCloseResolve],
245+
this[kCloseReject]
246+
);
247+
}
248+
}
229249
}
230250

231251
async function fsCall(fn, handle, ...args) {
@@ -242,18 +262,10 @@ async function fsCall(fn, handle, ...args) {
242262
}
243263

244264
try {
245-
handle[kRefs]++;
265+
handle[kRef]();
246266
return await fn(handle, ...args);
247267
} finally {
248-
handle[kRefs]--;
249-
if (handle[kRefs] === 0) {
250-
handle[kFd] = -1;
251-
PromisePrototypeThen(
252-
handle[kHandle].close(),
253-
handle[kCloseResolve],
254-
handle[kCloseReject]
255-
);
256-
}
268+
handle[kUnref]();
257269
}
258270
}
259271

@@ -712,5 +724,7 @@ module.exports = {
712724
readFile,
713725
},
714726

715-
FileHandle
727+
FileHandle,
728+
kRef,
729+
kUnref,
716730
};

lib/internal/fs/streams.js

+65-3
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,25 @@
22

33
const {
44
Array,
5+
FunctionPrototypeBind,
56
MathMin,
67
ObjectDefineProperty,
78
ObjectSetPrototypeOf,
9+
PromisePrototypeThen,
810
ReflectApply,
911
Symbol,
1012
} = primordials;
1113

1214
const {
1315
ERR_INVALID_ARG_TYPE,
14-
ERR_OUT_OF_RANGE
16+
ERR_OUT_OF_RANGE,
17+
ERR_METHOD_NOT_IMPLEMENTED,
1518
} = require('internal/errors').codes;
1619
const { deprecate } = require('internal/util');
1720
const { validateInteger } = require('internal/validators');
1821
const { errorOrDestroy } = require('internal/streams/destroy');
1922
const fs = require('fs');
23+
const { kRef, kUnref, FileHandle } = require('internal/fs/promises');
2024
const { Buffer } = require('buffer');
2125
const {
2226
copyObject,
@@ -28,6 +32,7 @@ const kIoDone = Symbol('kIoDone');
2832
const kIsPerformingIO = Symbol('kIsPerformingIO');
2933

3034
const kFs = Symbol('kFs');
35+
const kHandle = Symbol('kHandle');
3136

3237
function _construct(callback) {
3338
const stream = this;
@@ -66,6 +71,35 @@ function _construct(callback) {
6671
}
6772
}
6873

74+
// This generates an fs operations structure for a FileHandle
75+
const FileHandleOperations = (handle) => {
76+
return {
77+
open: (path, flags, mode, cb) => {
78+
throw new ERR_METHOD_NOT_IMPLEMENTED('open()');
79+
},
80+
close: (fd, cb) => {
81+
handle[kUnref]();
82+
PromisePrototypeThen(handle.close(),
83+
() => cb(), cb);
84+
},
85+
read: (fd, buf, offset, length, pos, cb) => {
86+
PromisePrototypeThen(handle.read(buf, offset, length, pos),
87+
(r) => cb(null, r.bytesRead, r.buffer),
88+
(err) => cb(err, 0, buf));
89+
},
90+
write: (fd, buf, offset, length, pos, cb) => {
91+
PromisePrototypeThen(handle.write(buf, offset, length, pos),
92+
(r) => cb(null, r.bytesWritten, r.buffer),
93+
(err) => cb(err, 0, buf));
94+
},
95+
writev: (fd, buffers, pos, cb) => {
96+
PromisePrototypeThen(handle.writev(buffers, pos),
97+
(r) => cb(null, r.bytesWritten, r.buffers),
98+
(err) => cb(err, 0, buffers));
99+
}
100+
};
101+
};
102+
69103
function close(stream, err, cb) {
70104
if (!stream.fd) {
71105
// TODO(ronag)
@@ -80,6 +114,32 @@ function close(stream, err, cb) {
80114
}
81115
}
82116

117+
function importFd(stream, options) {
118+
stream.fd = null;
119+
if (options.fd) {
120+
if (typeof options.fd === 'number') {
121+
// When fd is a raw descriptor, we must keep our fingers crossed
122+
// that the descriptor won't get closed, or worse, replaced with
123+
// another one
124+
// https://github.com/nodejs/node/issues/35862
125+
stream.fd = options.fd;
126+
} else if (typeof options.fd === 'object' &&
127+
options.fd instanceof FileHandle) {
128+
// When fd is a FileHandle we can listen for 'close' events
129+
if (options.fs)
130+
// FileHandle is not supported with custom fs operations
131+
throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs');
132+
stream[kHandle] = options.fd;
133+
stream.fd = options.fd.fd;
134+
stream[kFs] = FileHandleOperations(stream[kHandle]);
135+
stream[kHandle][kRef]();
136+
options.fd.on('close', FunctionPrototypeBind(stream.close, stream));
137+
} else
138+
throw ERR_INVALID_ARG_TYPE('options.fd',
139+
['number', 'FileHandle'], options.fd);
140+
}
141+
}
142+
83143
function ReadStream(path, options) {
84144
if (!(this instanceof ReadStream))
85145
return new ReadStream(path, options);
@@ -115,10 +175,11 @@ function ReadStream(path, options) {
115175

116176
// Path will be ignored when fd is specified, so it can be falsy
117177
this.path = toPathIfFileURL(path);
118-
this.fd = options.fd === undefined ? null : options.fd;
119178
this.flags = options.flags === undefined ? 'r' : options.flags;
120179
this.mode = options.mode === undefined ? 0o666 : options.mode;
121180

181+
importFd(this, options);
182+
122183
this.start = options.start;
123184
this.end = options.end;
124185
this.pos = undefined;
@@ -287,10 +348,11 @@ function WriteStream(path, options) {
287348

288349
// Path will be ignored when fd is specified, so it can be falsy
289350
this.path = toPathIfFileURL(path);
290-
this.fd = options.fd === undefined ? null : options.fd;
291351
this.flags = options.flags === undefined ? 'w' : options.flags;
292352
this.mode = options.mode === undefined ? 0o666 : options.mode;
293353

354+
importFd(this, options);
355+
294356
this.start = options.start;
295357
this.pos = undefined;
296358
this.bytesWritten = 0;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
'use strict';
2+
const common = require('../common');
3+
const fs = require('fs');
4+
const assert = require('assert');
5+
const path = require('path');
6+
const tmpdir = require('../common/tmpdir');
7+
const file = path.join(tmpdir.path, 'read_stream_filehandle_worker.txt');
8+
const input = 'hello world';
9+
const { Worker, isMainThread, workerData } = require('worker_threads');
10+
11+
if (isMainThread || !workerData) {
12+
tmpdir.refresh();
13+
fs.writeFileSync(file, input);
14+
15+
fs.promises.open(file, 'r').then((handle) => {
16+
handle.on('close', common.mustNotCall());
17+
new Worker(__filename, {
18+
workerData: { handle },
19+
transferList: [handle]
20+
});
21+
});
22+
fs.promises.open(file, 'r').then((handle) => {
23+
fs.createReadStream(null, { fd: handle });
24+
assert.throws(() => {
25+
new Worker(__filename, {
26+
workerData: { handle },
27+
transferList: [handle]
28+
});
29+
}, {
30+
code: 25,
31+
});
32+
});
33+
} else {
34+
let output = '';
35+
36+
const handle = workerData.handle;
37+
handle.on('close', common.mustCall());
38+
const stream = fs.createReadStream(null, { fd: handle });
39+
40+
stream.on('data', common.mustCallAtLeast((data) => {
41+
output += data;
42+
}));
43+
44+
stream.on('end', common.mustCall(() => {
45+
handle.close();
46+
assert.strictEqual(output, input);
47+
}));
48+
49+
stream.on('close', common.mustCall());
50+
}

0 commit comments

Comments
 (0)