Skip to content

Commit 4dc59b9

Browse files
addaleaxMylesBorins
authored andcommitted
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase. Originally landed in the QUIC repo Original review metadata: ``` PR-URL: nodejs/quic#165 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com> ``` Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: #31871 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
1 parent 478450d commit 4dc59b9

File tree

3 files changed

+265
-60
lines changed

3 files changed

+265
-60
lines changed

lib/dgram.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
232232
this.on('listening', onListening);
233233
}
234234

235-
if (port instanceof UDP) {
235+
if (port !== null &&
236+
typeof port === 'object' &&
237+
typeof port.recvStart === 'function') {
236238
replaceHandle(this, port);
237239
startListening(this);
238240
return this;

src/udp_wrap.cc

+151-54
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
6969
}
7070

7171

72-
inline bool SendWrap::have_callback() const {
72+
bool SendWrap::have_callback() const {
7373
return have_callback_;
7474
}
7575

76+
UDPListener::~UDPListener() {
77+
if (wrap_ != nullptr)
78+
wrap_->set_listener(nullptr);
79+
}
80+
81+
UDPWrapBase::~UDPWrapBase() {
82+
set_listener(nullptr);
83+
}
84+
85+
UDPListener* UDPWrapBase::listener() const {
86+
CHECK_NOT_NULL(listener_);
87+
return listener_;
88+
}
89+
90+
void UDPWrapBase::set_listener(UDPListener* listener) {
91+
if (listener_ != nullptr)
92+
listener_->wrap_ = nullptr;
93+
listener_ = listener;
94+
if (listener_ != nullptr) {
95+
CHECK_NULL(listener_->wrap_);
96+
listener_->wrap_ = this;
97+
}
98+
}
99+
100+
UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
101+
CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
102+
return static_cast<UDPWrapBase*>(
103+
obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
104+
}
105+
106+
void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
107+
env->SetProtoMethod(t, "recvStart", RecvStart);
108+
env->SetProtoMethod(t, "recvStop", RecvStop);
109+
}
76110

77111
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
78112
: HandleWrap(env,
79113
object,
80114
reinterpret_cast<uv_handle_t*>(&handle_),
81115
AsyncWrap::PROVIDER_UDPWRAP) {
116+
object->SetAlignedPointerInInternalField(
117+
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
118+
82119
int r = uv_udp_init(env->event_loop(), &handle_);
83120
CHECK_EQ(r, 0); // can't fail anyway
121+
122+
set_listener(this);
84123
}
85124

86125

@@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
91130
Environment* env = Environment::GetCurrent(context);
92131

93132
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
94-
t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
133+
t->InstanceTemplate()->SetInternalFieldCount(
134+
UDPWrapBase::kInternalFieldCount);
95135
Local<String> udpString =
96136
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
97137
t->SetClassName(udpString);
@@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
112152
Local<FunctionTemplate>(),
113153
attributes);
114154

155+
UDPWrapBase::AddMethods(env, t);
115156
env->SetProtoMethod(t, "open", Open);
116157
env->SetProtoMethod(t, "bind", Bind);
117158
env->SetProtoMethod(t, "connect", Connect);
@@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
120161
env->SetProtoMethod(t, "connect6", Connect6);
121162
env->SetProtoMethod(t, "send6", Send6);
122163
env->SetProtoMethod(t, "disconnect", Disconnect);
123-
env->SetProtoMethod(t, "recvStart", RecvStart);
124-
env->SetProtoMethod(t, "recvStop", RecvStop);
125164
env->SetProtoMethod(t, "getpeername",
126165
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
127166
env->SetProtoMethod(t, "getsockname",
@@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
220259
flags);
221260
}
222261

262+
if (err == 0)
263+
wrap->listener()->OnAfterBind();
264+
223265
args.GetReturnValue().Set(err);
224266
}
225267

@@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
464506
CHECK(args[3]->IsBoolean());
465507
}
466508

467-
Local<Object> req_wrap_obj = args[0].As<Object>();
468509
Local<Array> chunks = args[1].As<Array>();
469510
// it is faster to fetch the length of the
470511
// array in js-land
471512
size_t count = args[2].As<Uint32>()->Value();
472-
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
473-
474-
size_t msg_size = 0;
475513

476514
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
477515

@@ -482,7 +520,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
482520
size_t length = Buffer::Length(chunk);
483521

484522
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
485-
msg_size += length;
486523
}
487524

488525
int err = 0;
@@ -492,14 +529,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
492529
const unsigned short port = args[3].As<Uint32>()->Value();
493530
node::Utf8Value address(env->isolate(), args[4]);
494531
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
495-
if (err == 0) {
532+
if (err == 0)
496533
addr = reinterpret_cast<sockaddr*>(&addr_storage);
497-
}
498534
}
499535

500-
uv_buf_t* bufs_ptr = *bufs;
501-
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
502-
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
536+
if (err == 0) {
537+
wrap->current_send_req_wrap_ = args[0].As<Object>();
538+
wrap->current_send_has_callback_ =
539+
sendto ? args[5]->IsTrue() : args[3]->IsTrue();
540+
541+
err = wrap->Send(*bufs, count, addr);
542+
543+
wrap->current_send_req_wrap_.Clear();
544+
wrap->current_send_has_callback_ = false;
545+
}
546+
547+
args.GetReturnValue().Set(err);
548+
}
549+
550+
ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
551+
size_t count,
552+
const sockaddr* addr) {
553+
if (IsHandleClosing()) return UV_EBADF;
554+
555+
size_t msg_size = 0;
556+
for (size_t i = 0; i < count; i++)
557+
msg_size += bufs_ptr[i].len;
558+
559+
int err = 0;
560+
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
561+
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
503562
if (err == UV_ENOSYS || err == UV_EAGAIN) {
504563
err = 0;
505564
} else if (err >= 0) {
@@ -517,28 +576,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
517576
CHECK_EQ(static_cast<size_t>(err), msg_size);
518577
// + 1 so that the JS side can distinguish 0-length async sends from
519578
// 0-length sync sends.
520-
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
521-
return;
579+
return msg_size + 1;
522580
}
523581
}
524582
}
525583

526584
if (err == 0) {
527-
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
528-
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
529-
req_wrap->msg_size = msg_size;
530-
531-
err = req_wrap->Dispatch(uv_udp_send,
532-
&wrap->handle_,
533-
bufs_ptr,
534-
count,
535-
addr,
536-
OnSend);
585+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
586+
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
587+
if (req_wrap == nullptr) return UV_ENOSYS;
588+
589+
err = req_wrap->Dispatch(
590+
uv_udp_send,
591+
&handle_,
592+
bufs_ptr,
593+
count,
594+
addr,
595+
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
596+
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
597+
self->listener()->OnSendDone(
598+
ReqWrap<uv_udp_send_t>::from_req(req), status);
599+
}});
537600
if (err)
538601
delete req_wrap;
539602
}
540603

541-
args.GetReturnValue().Set(err);
604+
return err;
605+
}
606+
607+
608+
ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
609+
SendWrap* req_wrap = new SendWrap(env(),
610+
current_send_req_wrap_,
611+
current_send_has_callback_);
612+
req_wrap->msg_size = msg_size;
613+
return req_wrap;
542614
}
543615

544616

@@ -552,31 +624,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
552624
}
553625

554626

555-
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
556-
UDPWrap* wrap;
557-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
558-
args.Holder(),
559-
args.GetReturnValue().Set(UV_EBADF));
560-
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
627+
AsyncWrap* UDPWrap::GetAsyncWrap() {
628+
return this;
629+
}
630+
631+
int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
632+
return uv_udp_getpeername(&handle_, name, namelen);
633+
}
634+
635+
int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
636+
return uv_udp_getsockname(&handle_, name, namelen);
637+
}
638+
639+
void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
640+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
641+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
642+
}
643+
644+
int UDPWrap::RecvStart() {
645+
if (IsHandleClosing()) return UV_EBADF;
646+
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
561647
// UV_EALREADY means that the socket is already bound but that's okay
562648
if (err == UV_EALREADY)
563649
err = 0;
564-
args.GetReturnValue().Set(err);
650+
return err;
565651
}
566652

567653

568-
void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
569-
UDPWrap* wrap;
570-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
571-
args.Holder(),
572-
args.GetReturnValue().Set(UV_EBADF));
573-
int r = uv_udp_recv_stop(&wrap->handle_);
574-
args.GetReturnValue().Set(r);
654+
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
655+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
656+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
657+
}
658+
659+
int UDPWrap::RecvStop() {
660+
if (IsHandleClosing()) return UV_EBADF;
661+
return uv_udp_recv_stop(&handle_);
575662
}
576663

577664

578-
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
579-
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
665+
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
666+
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
580667
if (req_wrap->have_callback()) {
581668
Environment* env = req_wrap->env();
582669
HandleScope handle_scope(env->isolate());
@@ -593,43 +680,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
593680
void UDPWrap::OnAlloc(uv_handle_t* handle,
594681
size_t suggested_size,
595682
uv_buf_t* buf) {
596-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
597-
*buf = wrap->env()->AllocateManaged(suggested_size).release();
683+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
684+
reinterpret_cast<uv_udp_t*>(handle));
685+
*buf = wrap->listener()->OnAlloc(suggested_size);
686+
}
687+
688+
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
689+
return env()->AllocateManaged(suggested_size).release();
598690
}
599691

600692
void UDPWrap::OnRecv(uv_udp_t* handle,
601693
ssize_t nread,
602-
const uv_buf_t* buf_,
603-
const struct sockaddr* addr,
694+
const uv_buf_t* buf,
695+
const sockaddr* addr,
604696
unsigned int flags) {
605-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
606-
Environment* env = wrap->env();
697+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
698+
wrap->listener()->OnRecv(nread, *buf, addr, flags);
699+
}
607700

608-
AllocatedBuffer buf(env, *buf_);
701+
void UDPWrap::OnRecv(ssize_t nread,
702+
const uv_buf_t& buf_,
703+
const sockaddr* addr,
704+
unsigned int flags) {
705+
Environment* env = this->env();
706+
AllocatedBuffer buf(env, buf_);
609707
if (nread == 0 && addr == nullptr) {
610708
return;
611709
}
612710

613711
HandleScope handle_scope(env->isolate());
614712
Context::Scope context_scope(env->context());
615713

616-
Local<Object> wrap_obj = wrap->object();
617714
Local<Value> argv[] = {
618715
Integer::New(env->isolate(), nread),
619-
wrap_obj,
716+
object(),
620717
Undefined(env->isolate()),
621718
Undefined(env->isolate())
622719
};
623720

624721
if (nread < 0) {
625-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
722+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
626723
return;
627724
}
628725

629726
buf.Resize(nread);
630727
argv[2] = buf.ToBuffer().ToLocalChecked();
631728
argv[3] = AddressToJS(env, addr);
632-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
729+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
633730
}
634731

635732
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,

0 commit comments

Comments
 (0)