Skip to content

Commit 322a998

Browse files
addaleaxtargos
authored andcommitted
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase. Originally landed in the QUIC repo Original review metadata: ``` PR-URL: nodejs/quic#165 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com> ``` Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: #31871 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
1 parent c979aea commit 322a998

File tree

3 files changed

+265
-60
lines changed

3 files changed

+265
-60
lines changed

lib/dgram.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
232232
this.on('listening', onListening);
233233
}
234234

235-
if (port instanceof UDP) {
235+
if (port !== null &&
236+
typeof port === 'object' &&
237+
typeof port.recvStart === 'function') {
236238
replaceHandle(this, port);
237239
startListening(this);
238240
return this;

src/udp_wrap.cc

+151-54
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
6969
}
7070

7171

72-
inline bool SendWrap::have_callback() const {
72+
bool SendWrap::have_callback() const {
7373
return have_callback_;
7474
}
7575

76+
UDPListener::~UDPListener() {
77+
if (wrap_ != nullptr)
78+
wrap_->set_listener(nullptr);
79+
}
80+
81+
UDPWrapBase::~UDPWrapBase() {
82+
set_listener(nullptr);
83+
}
84+
85+
UDPListener* UDPWrapBase::listener() const {
86+
CHECK_NOT_NULL(listener_);
87+
return listener_;
88+
}
89+
90+
void UDPWrapBase::set_listener(UDPListener* listener) {
91+
if (listener_ != nullptr)
92+
listener_->wrap_ = nullptr;
93+
listener_ = listener;
94+
if (listener_ != nullptr) {
95+
CHECK_NULL(listener_->wrap_);
96+
listener_->wrap_ = this;
97+
}
98+
}
99+
100+
UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
101+
CHECK_GT(obj->InternalFieldCount(), UDPWrapBase::kUDPWrapBaseField);
102+
return static_cast<UDPWrapBase*>(
103+
obj->GetAlignedPointerFromInternalField(UDPWrapBase::kUDPWrapBaseField));
104+
}
105+
106+
void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
107+
env->SetProtoMethod(t, "recvStart", RecvStart);
108+
env->SetProtoMethod(t, "recvStop", RecvStop);
109+
}
76110

77111
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
78112
: HandleWrap(env,
79113
object,
80114
reinterpret_cast<uv_handle_t*>(&handle_),
81115
AsyncWrap::PROVIDER_UDPWRAP) {
116+
object->SetAlignedPointerInInternalField(
117+
UDPWrapBase::kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
118+
82119
int r = uv_udp_init(env->event_loop(), &handle_);
83120
CHECK_EQ(r, 0); // can't fail anyway
121+
122+
set_listener(this);
84123
}
85124

86125

@@ -91,7 +130,8 @@ void UDPWrap::Initialize(Local<Object> target,
91130
Environment* env = Environment::GetCurrent(context);
92131

93132
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
94-
t->InstanceTemplate()->SetInternalFieldCount(UDPWrap::kInternalFieldCount);
133+
t->InstanceTemplate()->SetInternalFieldCount(
134+
UDPWrapBase::kInternalFieldCount);
95135
Local<String> udpString =
96136
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
97137
t->SetClassName(udpString);
@@ -112,6 +152,7 @@ void UDPWrap::Initialize(Local<Object> target,
112152
Local<FunctionTemplate>(),
113153
attributes);
114154

155+
UDPWrapBase::AddMethods(env, t);
115156
env->SetProtoMethod(t, "open", Open);
116157
env->SetProtoMethod(t, "bind", Bind);
117158
env->SetProtoMethod(t, "connect", Connect);
@@ -120,8 +161,6 @@ void UDPWrap::Initialize(Local<Object> target,
120161
env->SetProtoMethod(t, "connect6", Connect6);
121162
env->SetProtoMethod(t, "send6", Send6);
122163
env->SetProtoMethod(t, "disconnect", Disconnect);
123-
env->SetProtoMethod(t, "recvStart", RecvStart);
124-
env->SetProtoMethod(t, "recvStop", RecvStop);
125164
env->SetProtoMethod(t, "getpeername",
126165
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
127166
env->SetProtoMethod(t, "getsockname",
@@ -220,6 +259,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
220259
flags);
221260
}
222261

262+
if (err == 0)
263+
wrap->listener()->OnAfterBind();
264+
223265
args.GetReturnValue().Set(err);
224266
}
225267

@@ -464,14 +506,10 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
464506
CHECK(args[3]->IsBoolean());
465507
}
466508

467-
Local<Object> req_wrap_obj = args[0].As<Object>();
468509
Local<Array> chunks = args[1].As<Array>();
469510
// it is faster to fetch the length of the
470511
// array in js-land
471512
size_t count = args[2].As<Uint32>()->Value();
472-
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
473-
474-
size_t msg_size = 0;
475513

476514
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
477515

@@ -483,7 +521,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
483521
size_t length = Buffer::Length(chunk);
484522

485523
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
486-
msg_size += length;
487524
}
488525

489526
int err = 0;
@@ -493,14 +530,36 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
493530
const unsigned short port = args[3].As<Uint32>()->Value();
494531
node::Utf8Value address(env->isolate(), args[4]);
495532
err = sockaddr_for_family(family, address.out(), port, &addr_storage);
496-
if (err == 0) {
533+
if (err == 0)
497534
addr = reinterpret_cast<sockaddr*>(&addr_storage);
498-
}
499535
}
500536

501-
uv_buf_t* bufs_ptr = *bufs;
502-
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
503-
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
537+
if (err == 0) {
538+
wrap->current_send_req_wrap_ = args[0].As<Object>();
539+
wrap->current_send_has_callback_ =
540+
sendto ? args[5]->IsTrue() : args[3]->IsTrue();
541+
542+
err = wrap->Send(*bufs, count, addr);
543+
544+
wrap->current_send_req_wrap_.Clear();
545+
wrap->current_send_has_callback_ = false;
546+
}
547+
548+
args.GetReturnValue().Set(err);
549+
}
550+
551+
ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
552+
size_t count,
553+
const sockaddr* addr) {
554+
if (IsHandleClosing()) return UV_EBADF;
555+
556+
size_t msg_size = 0;
557+
for (size_t i = 0; i < count; i++)
558+
msg_size += bufs_ptr[i].len;
559+
560+
int err = 0;
561+
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
562+
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
504563
if (err == UV_ENOSYS || err == UV_EAGAIN) {
505564
err = 0;
506565
} else if (err >= 0) {
@@ -518,28 +577,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
518577
CHECK_EQ(static_cast<size_t>(err), msg_size);
519578
// + 1 so that the JS side can distinguish 0-length async sends from
520579
// 0-length sync sends.
521-
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
522-
return;
580+
return msg_size + 1;
523581
}
524582
}
525583
}
526584

527585
if (err == 0) {
528-
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
529-
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
530-
req_wrap->msg_size = msg_size;
531-
532-
err = req_wrap->Dispatch(uv_udp_send,
533-
&wrap->handle_,
534-
bufs_ptr,
535-
count,
536-
addr,
537-
OnSend);
586+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
587+
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
588+
if (req_wrap == nullptr) return UV_ENOSYS;
589+
590+
err = req_wrap->Dispatch(
591+
uv_udp_send,
592+
&handle_,
593+
bufs_ptr,
594+
count,
595+
addr,
596+
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
597+
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
598+
self->listener()->OnSendDone(
599+
ReqWrap<uv_udp_send_t>::from_req(req), status);
600+
}});
538601
if (err)
539602
delete req_wrap;
540603
}
541604

542-
args.GetReturnValue().Set(err);
605+
return err;
606+
}
607+
608+
609+
ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
610+
SendWrap* req_wrap = new SendWrap(env(),
611+
current_send_req_wrap_,
612+
current_send_has_callback_);
613+
req_wrap->msg_size = msg_size;
614+
return req_wrap;
543615
}
544616

545617

@@ -553,31 +625,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
553625
}
554626

555627

556-
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
557-
UDPWrap* wrap;
558-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
559-
args.Holder(),
560-
args.GetReturnValue().Set(UV_EBADF));
561-
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
628+
AsyncWrap* UDPWrap::GetAsyncWrap() {
629+
return this;
630+
}
631+
632+
int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
633+
return uv_udp_getpeername(&handle_, name, namelen);
634+
}
635+
636+
int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
637+
return uv_udp_getsockname(&handle_, name, namelen);
638+
}
639+
640+
void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
641+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
642+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
643+
}
644+
645+
int UDPWrap::RecvStart() {
646+
if (IsHandleClosing()) return UV_EBADF;
647+
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
562648
// UV_EALREADY means that the socket is already bound but that's okay
563649
if (err == UV_EALREADY)
564650
err = 0;
565-
args.GetReturnValue().Set(err);
651+
return err;
566652
}
567653

568654

569-
void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
570-
UDPWrap* wrap;
571-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
572-
args.Holder(),
573-
args.GetReturnValue().Set(UV_EBADF));
574-
int r = uv_udp_recv_stop(&wrap->handle_);
575-
args.GetReturnValue().Set(r);
655+
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
656+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
657+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
658+
}
659+
660+
int UDPWrap::RecvStop() {
661+
if (IsHandleClosing()) return UV_EBADF;
662+
return uv_udp_recv_stop(&handle_);
576663
}
577664

578665

579-
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
580-
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
666+
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
667+
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
581668
if (req_wrap->have_callback()) {
582669
Environment* env = req_wrap->env();
583670
HandleScope handle_scope(env->isolate());
@@ -594,43 +681,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
594681
void UDPWrap::OnAlloc(uv_handle_t* handle,
595682
size_t suggested_size,
596683
uv_buf_t* buf) {
597-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
598-
*buf = wrap->env()->AllocateManaged(suggested_size).release();
684+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
685+
reinterpret_cast<uv_udp_t*>(handle));
686+
*buf = wrap->listener()->OnAlloc(suggested_size);
687+
}
688+
689+
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
690+
return env()->AllocateManaged(suggested_size).release();
599691
}
600692

601693
void UDPWrap::OnRecv(uv_udp_t* handle,
602694
ssize_t nread,
603-
const uv_buf_t* buf_,
604-
const struct sockaddr* addr,
695+
const uv_buf_t* buf,
696+
const sockaddr* addr,
605697
unsigned int flags) {
606-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
607-
Environment* env = wrap->env();
698+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
699+
wrap->listener()->OnRecv(nread, *buf, addr, flags);
700+
}
608701

609-
AllocatedBuffer buf(env, *buf_);
702+
void UDPWrap::OnRecv(ssize_t nread,
703+
const uv_buf_t& buf_,
704+
const sockaddr* addr,
705+
unsigned int flags) {
706+
Environment* env = this->env();
707+
AllocatedBuffer buf(env, buf_);
610708
if (nread == 0 && addr == nullptr) {
611709
return;
612710
}
613711

614712
HandleScope handle_scope(env->isolate());
615713
Context::Scope context_scope(env->context());
616714

617-
Local<Object> wrap_obj = wrap->object();
618715
Local<Value> argv[] = {
619716
Integer::New(env->isolate(), nread),
620-
wrap_obj,
717+
object(),
621718
Undefined(env->isolate()),
622719
Undefined(env->isolate())
623720
};
624721

625722
if (nread < 0) {
626-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
723+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
627724
return;
628725
}
629726

630727
buf.Resize(nread);
631728
argv[2] = buf.ToBuffer().ToLocalChecked();
632729
argv[3] = AddressToJS(env, addr);
633-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
730+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
634731
}
635732

636733
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,

0 commit comments

Comments
 (0)