Skip to content

Commit 2bea9ce

Browse files
oyydmcollina
authored andcommitted
dgram: implement socket.bind({ fd })
dgram: Implement binding an existing `fd`. Allow pass a `fd` property to `socket.bind()` in dgram. src: Add `UDPWrap::Open` PR-URL: #21745 Fixes: #14961 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
1 parent 214844e commit 2bea9ce

10 files changed

+442
-39
lines changed

doc/api/dgram.md

+6
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ added: v0.11.14
166166
* `port` {integer}
167167
* `address` {string}
168168
* `exclusive` {boolean}
169+
* `fd` {integer}
169170
* `callback` {Function}
170171

171172
For UDP sockets, causes the `dgram.Socket` to listen for datagram
@@ -177,6 +178,11 @@ system will attempt to listen on all addresses. Once binding is
177178
complete, a `'listening'` event is emitted and the optional `callback`
178179
function is called.
179180

181+
The `options` object may contain a `fd` property. When a `fd` greater
182+
than `0` is set, it will wrap around an existing socket with the given
183+
file descriptor. In this case, the properties of `port` and `address`
184+
will be ignored.
185+
180186
Note that specifying both a `'listening'` event listener and passing a
181187
`callback` to the `socket.bind()` method is not harmful but not very
182188
useful.

lib/dgram.js

+72-19
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ const errors = require('internal/errors');
2525
const {
2626
kStateSymbol,
2727
_createSocketHandle,
28-
newHandle
28+
newHandle,
29+
guessHandleType,
2930
} = require('internal/dgram');
3031
const {
3132
ERR_INVALID_ARG_TYPE,
@@ -35,7 +36,8 @@ const {
3536
ERR_SOCKET_BAD_PORT,
3637
ERR_SOCKET_BUFFER_SIZE,
3738
ERR_SOCKET_CANNOT_SEND,
38-
ERR_SOCKET_DGRAM_NOT_RUNNING
39+
ERR_SOCKET_DGRAM_NOT_RUNNING,
40+
ERR_INVALID_FD_TYPE
3941
} = errors.codes;
4042
const { Buffer } = require('buffer');
4143
const util = require('util');
@@ -45,6 +47,7 @@ const {
4547
defaultTriggerAsyncIdScope,
4648
symbols: { async_id_symbol, owner_symbol }
4749
} = require('internal/async_hooks');
50+
const { isInt32 } = require('internal/validators');
4851
const { UV_UDP_REUSEADDR } = process.binding('constants').os;
4952

5053
const { UDP, SendWrap } = process.binding('udp_wrap');
@@ -151,6 +154,28 @@ function bufferSize(self, size, buffer) {
151154
return ret;
152155
}
153156

157+
// Query master process to get the server handle and utilize it.
158+
function bindServerHandle(self, options, errCb) {
159+
if (!cluster)
160+
cluster = require('cluster');
161+
162+
const state = self[kStateSymbol];
163+
cluster._getServer(self, options, (err, handle) => {
164+
if (err) {
165+
errCb(err);
166+
return;
167+
}
168+
169+
if (!state.handle) {
170+
// Handle has been closed in the mean time.
171+
return handle.close();
172+
}
173+
174+
replaceHandle(self, handle);
175+
startListening(self);
176+
});
177+
}
178+
154179
Socket.prototype.bind = function(port_, address_ /* , callback */) {
155180
let port = port_;
156181

@@ -171,6 +196,44 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
171196
return this;
172197
}
173198

199+
// Open an existing fd instead of creating a new one.
200+
if (port !== null && typeof port === 'object' &&
201+
isInt32(port.fd) && port.fd > 0) {
202+
const fd = port.fd;
203+
const exclusive = !!port.exclusive;
204+
const state = this[kStateSymbol];
205+
206+
if (!cluster)
207+
cluster = require('cluster');
208+
209+
if (cluster.isWorker && !exclusive) {
210+
bindServerHandle(this, {
211+
address: null,
212+
port: null,
213+
addressType: this.type,
214+
fd,
215+
flags: null
216+
}, (err) => {
217+
// Callback to handle error.
218+
const ex = errnoException(err, 'open');
219+
this.emit('error', ex);
220+
state.bindState = BIND_STATE_UNBOUND;
221+
});
222+
return this;
223+
}
224+
225+
const type = guessHandleType(fd);
226+
if (type !== 'UDP')
227+
throw new ERR_INVALID_FD_TYPE(type);
228+
const err = state.handle.open(fd);
229+
230+
if (err)
231+
throw errnoException(err, 'open');
232+
233+
startListening(this);
234+
return this;
235+
}
236+
174237
var address;
175238
var exclusive;
176239

@@ -207,28 +270,18 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
207270
flags |= UV_UDP_REUSEADDR;
208271

209272
if (cluster.isWorker && !exclusive) {
210-
const onHandle = (err, handle) => {
211-
if (err) {
212-
var ex = exceptionWithHostPort(err, 'bind', ip, port);
213-
this.emit('error', ex);
214-
state.bindState = BIND_STATE_UNBOUND;
215-
return;
216-
}
217-
218-
if (!state.handle)
219-
// handle has been closed in the mean time.
220-
return handle.close();
221-
222-
replaceHandle(this, handle);
223-
startListening(this);
224-
};
225-
cluster._getServer(this, {
273+
bindServerHandle(this, {
226274
address: ip,
227275
port: port,
228276
addressType: this.type,
229277
fd: -1,
230278
flags: flags
231-
}, onHandle);
279+
}, (err) => {
280+
// Callback to handle error.
281+
const ex = exceptionWithHostPort(err, 'bind', ip, port);
282+
this.emit('error', ex);
283+
state.bindState = BIND_STATE_UNBOUND;
284+
});
232285
} else {
233286
if (!state.handle)
234287
return; // handle has been closed in the mean time

lib/internal/dgram.js

+27-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
'use strict';
2-
const assert = require('assert');
32
const { codes } = require('internal/errors');
43
const { UDP } = process.binding('udp_wrap');
4+
const { isInt32 } = require('internal/validators');
5+
const TTYWrap = process.binding('tty_wrap');
6+
const { UV_EINVAL } = process.binding('uv');
57
const { ERR_INVALID_ARG_TYPE, ERR_SOCKET_BAD_TYPE } = codes;
68
const kStateSymbol = Symbol('state symbol');
79
let dns; // Lazy load for startup performance.
@@ -17,6 +19,9 @@ function lookup6(lookup, address, callback) {
1719
}
1820

1921

22+
const guessHandleType = TTYWrap.guessHandleType;
23+
24+
2025
function newHandle(type, lookup) {
2126
if (lookup === undefined) {
2227
if (dns === undefined) {
@@ -49,22 +54,32 @@ function newHandle(type, lookup) {
4954

5055

5156
function _createSocketHandle(address, port, addressType, fd, flags) {
52-
// Opening an existing fd is not supported for UDP handles.
53-
assert(typeof fd !== 'number' || fd < 0);
54-
5557
const handle = newHandle(addressType);
56-
57-
if (port || address) {
58-
const err = handle.bind(address, port || 0, flags);
59-
60-
if (err) {
61-
handle.close();
62-
return err;
58+
let err;
59+
60+
if (isInt32(fd) && fd > 0) {
61+
const type = guessHandleType(fd);
62+
if (type !== 'UDP') {
63+
err = UV_EINVAL;
64+
} else {
65+
err = handle.open(fd);
6366
}
67+
} else if (port || address) {
68+
err = handle.bind(address, port || 0, flags);
69+
}
70+
71+
if (err) {
72+
handle.close();
73+
return err;
6474
}
6575

6676
return handle;
6777
}
6878

6979

70-
module.exports = { kStateSymbol, _createSocketHandle, newHandle };
80+
module.exports = {
81+
kStateSymbol,
82+
_createSocketHandle,
83+
newHandle,
84+
guessHandleType,
85+
};

src/udp_wrap.cc

+13
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ void UDPWrap::Initialize(Local<Object> target,
117117
Local<FunctionTemplate>(),
118118
attributes);
119119

120+
env->SetProtoMethod(t, "open", Open);
120121
env->SetProtoMethod(t, "bind", Bind);
121122
env->SetProtoMethod(t, "send", Send);
122123
env->SetProtoMethod(t, "bind6", Bind6);
@@ -206,6 +207,18 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
206207
}
207208

208209

210+
void UDPWrap::Open(const FunctionCallbackInfo<Value>& args) {
211+
UDPWrap* wrap;
212+
ASSIGN_OR_RETURN_UNWRAP(&wrap,
213+
args.Holder(),
214+
args.GetReturnValue().Set(UV_EBADF));
215+
int fd = static_cast<int>(args[0]->IntegerValue());
216+
int err = uv_udp_open(&wrap->handle_, fd);
217+
218+
args.GetReturnValue().Set(err);
219+
}
220+
221+
209222
void UDPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
210223
DoBind(args, AF_INET);
211224
}

src/udp_wrap.h

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class UDPWrap: public HandleWrap {
4242
v8::Local<v8::Context> context);
4343
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
4444
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
45+
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
4546
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);
4647
static void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
4748
static void Bind6(const v8::FunctionCallbackInfo<v8::Value>& args);
+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
'use strict';
2+
const common = require('../common');
3+
if (common.isWindows)
4+
common.skip('dgram clustering is currently not supported on Windows.');
5+
6+
const NUM_WORKERS = 4;
7+
const PACKETS_PER_WORKER = 10;
8+
9+
const assert = require('assert');
10+
const cluster = require('cluster');
11+
const dgram = require('dgram');
12+
const { UDP } = process.binding('udp_wrap');
13+
14+
if (cluster.isMaster)
15+
master();
16+
else
17+
worker();
18+
19+
20+
function master() {
21+
// Create a handle and use its fd.
22+
const rawHandle = new UDP();
23+
const err = rawHandle.bind(common.localhostIPv4, 0, 0);
24+
assert(err >= 0, String(err));
25+
assert.notStrictEqual(rawHandle.fd, -1);
26+
27+
const fd = rawHandle.fd;
28+
29+
let listening = 0;
30+
31+
// Fork 4 workers.
32+
for (let i = 0; i < NUM_WORKERS; i++)
33+
cluster.fork();
34+
35+
// Wait until all workers are listening.
36+
cluster.on('listening', common.mustCall((worker, address) => {
37+
if (++listening < NUM_WORKERS)
38+
return;
39+
40+
// Start sending messages.
41+
const buf = Buffer.from('hello world');
42+
const socket = dgram.createSocket('udp4');
43+
let sent = 0;
44+
doSend();
45+
46+
function doSend() {
47+
socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
48+
}
49+
50+
function afterSend() {
51+
sent++;
52+
if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
53+
doSend();
54+
} else {
55+
socket.close();
56+
}
57+
}
58+
}, NUM_WORKERS));
59+
60+
// Set up event handlers for every worker. Each worker sends a message when
61+
// it has received the expected number of packets. After that it disconnects.
62+
for (const key in cluster.workers) {
63+
if (cluster.workers.hasOwnProperty(key))
64+
setupWorker(cluster.workers[key]);
65+
}
66+
67+
function setupWorker(worker) {
68+
let received = 0;
69+
70+
worker.send({
71+
fd,
72+
});
73+
74+
worker.on('message', common.mustCall((msg) => {
75+
received = msg.received;
76+
worker.disconnect();
77+
}));
78+
79+
worker.on('exit', common.mustCall(() => {
80+
assert.strictEqual(received, PACKETS_PER_WORKER);
81+
}));
82+
}
83+
}
84+
85+
86+
function worker() {
87+
let received = 0;
88+
89+
process.on('message', common.mustCall((data) => {
90+
const { fd } = data;
91+
// Create udp socket and start listening.
92+
const socket = dgram.createSocket('udp4');
93+
94+
socket.on('message', common.mustCall((data, info) => {
95+
received++;
96+
97+
// Every 10 messages, notify the master.
98+
if (received === PACKETS_PER_WORKER) {
99+
process.send({ received });
100+
socket.close();
101+
}
102+
}, PACKETS_PER_WORKER));
103+
104+
socket.bind({
105+
fd,
106+
});
107+
}));
108+
}

0 commit comments

Comments
 (0)