Skip to content
This repository was archived by the owner on Aug 11, 2020. It is now read-only.

Commit 517d09d

Browse files
committed
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase. PR-URL: #165 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Daniel Bevenius <daniel.bevenius@gmail.com>
1 parent 107b141 commit 517d09d

File tree

3 files changed

+255
-56
lines changed

3 files changed

+255
-56
lines changed

lib/dgram.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
214214
if (arguments.length && typeof arguments[arguments.length - 1] === 'function')
215215
this.once('listening', arguments[arguments.length - 1]);
216216

217-
if (port instanceof UDP) {
217+
if (port !== null &&
218+
typeof port === 'object' &&
219+
typeof port.recvStart === 'function') {
218220
replaceHandle(this, port);
219221
startListening(this);
220222
return this;

src/udp_wrap.cc

+145-51
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,57 @@ SendWrap::SendWrap(Environment* env,
6969
}
7070

7171

72-
inline bool SendWrap::have_callback() const {
72+
bool SendWrap::have_callback() const {
7373
return have_callback_;
7474
}
7575

76+
UDPListener::~UDPListener() {
77+
if (wrap_ != nullptr)
78+
wrap_->set_listener(nullptr);
79+
}
80+
81+
UDPWrapBase::~UDPWrapBase() {
82+
set_listener(nullptr);
83+
}
84+
85+
UDPListener* UDPWrapBase::listener() const {
86+
CHECK_NOT_NULL(listener_);
87+
return listener_;
88+
}
89+
90+
void UDPWrapBase::set_listener(UDPListener* listener) {
91+
if (listener_ != nullptr)
92+
listener_->wrap_ = nullptr;
93+
listener_ = listener;
94+
if (listener_ != nullptr) {
95+
CHECK_NULL(listener_->wrap_);
96+
listener_->wrap_ = this;
97+
}
98+
}
99+
100+
UDPWrapBase* UDPWrapBase::FromObject(Local<Object> obj) {
101+
CHECK_GT(obj->InternalFieldCount(), kUDPWrapBaseField);
102+
return static_cast<UDPWrapBase*>(
103+
obj->GetAlignedPointerFromInternalField(kUDPWrapBaseField));
104+
}
105+
106+
void UDPWrapBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
107+
env->SetProtoMethod(t, "recvStart", RecvStart);
108+
env->SetProtoMethod(t, "recvStop", RecvStop);
109+
}
76110

77111
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
78112
: HandleWrap(env,
79113
object,
80114
reinterpret_cast<uv_handle_t*>(&handle_),
81115
AsyncWrap::PROVIDER_UDPWRAP) {
116+
object->SetAlignedPointerInInternalField(
117+
kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
118+
82119
int r = uv_udp_init(env->event_loop(), &handle_);
83120
CHECK_EQ(r, 0); // can't fail anyway
121+
122+
set_listener(this);
84123
}
85124

86125

@@ -91,7 +130,7 @@ void UDPWrap::Initialize(Local<Object> target,
91130
Environment* env = Environment::GetCurrent(context);
92131

93132
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
94-
t->InstanceTemplate()->SetInternalFieldCount(1);
133+
t->InstanceTemplate()->SetInternalFieldCount(kUDPWrapBaseField + 1);
95134
Local<String> udpString =
96135
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
97136
t->SetClassName(udpString);
@@ -112,6 +151,7 @@ void UDPWrap::Initialize(Local<Object> target,
112151
Local<FunctionTemplate>(),
113152
attributes);
114153

154+
UDPWrapBase::AddMethods(env, t);
115155
env->SetProtoMethod(t, "open", Open);
116156
env->SetProtoMethod(t, "bind", Bind);
117157
env->SetProtoMethod(t, "connect", Connect);
@@ -120,8 +160,6 @@ void UDPWrap::Initialize(Local<Object> target,
120160
env->SetProtoMethod(t, "connect6", Connect6);
121161
env->SetProtoMethod(t, "send6", Send6);
122162
env->SetProtoMethod(t, "disconnect", Disconnect);
123-
env->SetProtoMethod(t, "recvStart", RecvStart);
124-
env->SetProtoMethod(t, "recvStop", RecvStop);
125163
env->SetProtoMethod(t, "getpeername",
126164
GetSockOrPeerName<UDPWrap, uv_udp_getpeername>);
127165
env->SetProtoMethod(t, "getsockname",
@@ -216,6 +254,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
216254
flags);
217255
}
218256

257+
if (err == 0)
258+
wrap->listener()->OnAfterBind();
259+
219260
args.GetReturnValue().Set(err);
220261
}
221262

@@ -422,14 +463,14 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
422463
CHECK(args[3]->IsBoolean());
423464
}
424465

425-
Local<Object> req_wrap_obj = args[0].As<Object>();
426466
Local<Array> chunks = args[1].As<Array>();
427467
// it is faster to fetch the length of the
428468
// array in js-land
429469
size_t count = args[2].As<Uint32>()->Value();
430-
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
431470

432-
size_t msg_size = 0;
471+
wrap->current_send_req_wrap_ = args[0].As<Object>();
472+
wrap->current_send_has_callback_ =
473+
sendto ? args[5]->IsTrue() : args[3]->IsTrue();
433474

434475
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
435476

@@ -440,7 +481,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
440481
size_t length = Buffer::Length(chunk);
441482

442483
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
443-
msg_size += length;
444484
}
445485

446486
int err = 0;
@@ -455,9 +495,25 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
455495
}
456496
}
457497

458-
uv_buf_t* bufs_ptr = *bufs;
459-
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
460-
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
498+
if (err == 0) {
499+
err = wrap->Send(*bufs, count, addr);
500+
}
501+
502+
args.GetReturnValue().Set(err);
503+
}
504+
505+
ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
506+
size_t count,
507+
const sockaddr* addr) {
508+
if (IsHandleClosing()) return UV_EBADF;
509+
510+
size_t msg_size = 0;
511+
for (size_t i = 0; i < count; i++)
512+
msg_size += bufs_ptr[i].len;
513+
514+
int err = 0;
515+
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
516+
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
461517
if (err == UV_ENOSYS || err == UV_EAGAIN) {
462518
err = 0;
463519
} else if (err >= 0) {
@@ -475,28 +531,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
475531
CHECK_EQ(static_cast<size_t>(err), msg_size);
476532
// + 1 so that the JS side can distinguish 0-length async sends from
477533
// 0-length sync sends.
478-
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
479-
return;
534+
return msg_size + 1;
480535
}
481536
}
482537
}
483538

484539
if (err == 0) {
485-
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
486-
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
487-
req_wrap->msg_size = msg_size;
488-
489-
err = req_wrap->Dispatch(uv_udp_send,
490-
&wrap->handle_,
491-
bufs_ptr,
492-
count,
493-
addr,
494-
OnSend);
540+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
541+
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
542+
if (req_wrap == nullptr) return UV_ENOSYS;
543+
544+
err = req_wrap->Dispatch(
545+
uv_udp_send,
546+
&handle_,
547+
bufs_ptr,
548+
count,
549+
addr,
550+
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
551+
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
552+
self->listener()->OnSendDone(
553+
ReqWrap<uv_udp_send_t>::from_req(req), status);
554+
}});
495555
if (err)
496556
delete req_wrap;
497557
}
498558

499-
args.GetReturnValue().Set(err);
559+
return err;
560+
}
561+
562+
563+
ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
564+
SendWrap* req_wrap = new SendWrap(env(),
565+
current_send_req_wrap_,
566+
current_send_has_callback_);
567+
req_wrap->msg_size = msg_size;
568+
return req_wrap;
500569
}
501570

502571

@@ -510,31 +579,46 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
510579
}
511580

512581

513-
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
514-
UDPWrap* wrap;
515-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
516-
args.Holder(),
517-
args.GetReturnValue().Set(UV_EBADF));
518-
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
582+
AsyncWrap* UDPWrap::GetAsyncWrap() {
583+
return this;
584+
}
585+
586+
int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
587+
return uv_udp_getpeername(&handle_, name, namelen);
588+
}
589+
590+
int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
591+
return uv_udp_getsockname(&handle_, name, namelen);
592+
}
593+
594+
void UDPWrapBase::RecvStart(const FunctionCallbackInfo<Value>& args) {
595+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
596+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStart());
597+
}
598+
599+
int UDPWrap::RecvStart() {
600+
if (IsHandleClosing()) return UV_EBADF;
601+
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
519602
// UV_EALREADY means that the socket is already bound but that's okay
520603
if (err == UV_EALREADY)
521604
err = 0;
522-
args.GetReturnValue().Set(err);
605+
return err;
523606
}
524607

525608

526-
void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
527-
UDPWrap* wrap;
528-
ASSIGN_OR_RETURN_UNWRAP(&wrap,
529-
args.Holder(),
530-
args.GetReturnValue().Set(UV_EBADF));
531-
int r = uv_udp_recv_stop(&wrap->handle_);
532-
args.GetReturnValue().Set(r);
609+
void UDPWrapBase::RecvStop(const FunctionCallbackInfo<Value>& args) {
610+
UDPWrapBase* wrap = UDPWrapBase::FromObject(args.Holder());
611+
args.GetReturnValue().Set(wrap == nullptr ? UV_EBADF : wrap->RecvStop());
612+
}
613+
614+
int UDPWrap::RecvStop() {
615+
if (IsHandleClosing()) return UV_EBADF;
616+
return uv_udp_recv_stop(&handle_);
533617
}
534618

535619

536-
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
537-
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
620+
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
621+
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
538622
if (req_wrap->have_callback()) {
539623
Environment* env = req_wrap->env();
540624
HandleScope handle_scope(env->isolate());
@@ -551,43 +635,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
551635
void UDPWrap::OnAlloc(uv_handle_t* handle,
552636
size_t suggested_size,
553637
uv_buf_t* buf) {
554-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
555-
*buf = wrap->env()->AllocateManaged(suggested_size).release();
638+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
639+
reinterpret_cast<uv_udp_t*>(handle));
640+
*buf = wrap->listener()->OnAlloc(suggested_size);
641+
}
642+
643+
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
644+
return env()->AllocateManaged(suggested_size).release();
556645
}
557646

558647
void UDPWrap::OnRecv(uv_udp_t* handle,
559648
ssize_t nread,
560-
const uv_buf_t* buf_,
561-
const struct sockaddr* addr,
649+
const uv_buf_t* buf,
650+
const sockaddr* addr,
562651
unsigned int flags) {
563-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
564-
Environment* env = wrap->env();
652+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
653+
wrap->listener()->OnRecv(nread, *buf, addr, flags);
654+
}
565655

566-
AllocatedBuffer buf(env, *buf_);
656+
void UDPWrap::OnRecv(ssize_t nread,
657+
const uv_buf_t& buf_,
658+
const sockaddr* addr,
659+
unsigned int flags) {
660+
Environment* env = this->env();
661+
AllocatedBuffer buf(env, buf_);
567662
if (nread == 0 && addr == nullptr) {
568663
return;
569664
}
570665

571666
HandleScope handle_scope(env->isolate());
572667
Context::Scope context_scope(env->context());
573668

574-
Local<Object> wrap_obj = wrap->object();
575669
Local<Value> argv[] = {
576670
Integer::New(env->isolate(), nread),
577-
wrap_obj,
671+
object(),
578672
Undefined(env->isolate()),
579673
Undefined(env->isolate())
580674
};
581675

582676
if (nread < 0) {
583-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
677+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
584678
return;
585679
}
586680

587681
buf.Resize(nread);
588682
argv[2] = buf.ToBuffer().ToLocalChecked();
589683
argv[3] = AddressToJS(env, addr);
590-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
684+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
591685
}
592686

593687
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,

0 commit comments

Comments
 (0)