Skip to content
This repository was archived by the owner on Aug 11, 2020. It is now read-only.

Commit 641b7ed

Browse files
committed
dgram: make UDPWrap more reusable
Allow using the handle more directly for I/O in other parts of the codebase.
1 parent 8bf1931 commit 641b7ed

File tree

2 files changed

+193
-41
lines changed

2 files changed

+193
-41
lines changed

src/udp_wrap.cc

+125-39
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,46 @@ SendWrap::SendWrap(Environment* env,
6969
}
7070

7171

72-
inline bool SendWrap::have_callback() const {
72+
bool SendWrap::have_callback() const {
7373
return have_callback_;
7474
}
7575

76+
UDPListener::~UDPListener() {
77+
if (wrap_ != nullptr)
78+
wrap_->set_listener(nullptr);
79+
}
80+
81+
UDPWrapBase::~UDPWrapBase() {
82+
set_listener(nullptr);
83+
}
84+
85+
UDPListener* UDPWrapBase::listener() const {
86+
CHECK_NOT_NULL(listener_);
87+
return listener_;
88+
}
89+
90+
void UDPWrapBase::set_listener(UDPListener* listener) {
91+
if (listener_ != nullptr)
92+
listener_->wrap_ = nullptr;
93+
listener_ = listener;
94+
if (listener_ != nullptr) {
95+
CHECK_NULL(listener_->wrap_);
96+
listener_->wrap_ = this;
97+
}
98+
}
7699

77100
UDPWrap::UDPWrap(Environment* env, Local<Object> object)
78101
: HandleWrap(env,
79102
object,
80103
reinterpret_cast<uv_handle_t*>(&handle_),
81104
AsyncWrap::PROVIDER_UDPWRAP) {
105+
object->SetAlignedPointerInInternalField(
106+
kUDPWrapBaseField, static_cast<UDPWrapBase*>(this));
107+
82108
int r = uv_udp_init(env->event_loop(), &handle_);
83109
CHECK_EQ(r, 0); // can't fail anyway
110+
111+
set_listener(this);
84112
}
85113

86114

@@ -91,7 +119,7 @@ void UDPWrap::Initialize(Local<Object> target,
91119
Environment* env = Environment::GetCurrent(context);
92120

93121
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
94-
t->InstanceTemplate()->SetInternalFieldCount(1);
122+
t->InstanceTemplate()->SetInternalFieldCount(kUDPWrapBaseField + 1);
95123
Local<String> udpString =
96124
FIXED_ONE_BYTE_STRING(env->isolate(), "UDP");
97125
t->SetClassName(udpString);
@@ -216,6 +244,9 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
216244
flags);
217245
}
218246

247+
if (err == 0)
248+
wrap->listener()->OnAfterBind();
249+
219250
args.GetReturnValue().Set(err);
220251
}
221252

@@ -422,14 +453,14 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
422453
CHECK(args[3]->IsBoolean());
423454
}
424455

425-
Local<Object> req_wrap_obj = args[0].As<Object>();
426456
Local<Array> chunks = args[1].As<Array>();
427457
// it is faster to fetch the length of the
428458
// array in js-land
429459
size_t count = args[2].As<Uint32>()->Value();
430-
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
431460

432-
size_t msg_size = 0;
461+
wrap->current_send_req_wrap_ = args[0].As<Object>();
462+
wrap->current_send_has_callback_ =
463+
sendto ? args[5]->IsTrue() : args[3]->IsTrue();
433464

434465
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
435466

@@ -440,7 +471,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
440471
size_t length = Buffer::Length(chunk);
441472

442473
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
443-
msg_size += length;
444474
}
445475

446476
int err = 0;
@@ -455,9 +485,23 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
455485
}
456486
}
457487

458-
uv_buf_t* bufs_ptr = *bufs;
459-
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
460-
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
488+
if (err == 0) {
489+
err = wrap->Send(*bufs, count, addr);
490+
}
491+
492+
args.GetReturnValue().Set(err);
493+
}
494+
495+
ssize_t UDPWrap::Send(uv_buf_t* bufs_ptr,
496+
size_t count,
497+
const sockaddr* addr) {
498+
size_t msg_size = 0;
499+
for (size_t i = 0; i < count; i++)
500+
msg_size += bufs_ptr[i].len;
501+
502+
int err = 0;
503+
if (!UNLIKELY(env()->options()->test_udp_no_try_send)) {
504+
err = uv_udp_try_send(&handle_, bufs_ptr, count, addr);
461505
if (err == UV_ENOSYS || err == UV_EAGAIN) {
462506
err = 0;
463507
} else if (err >= 0) {
@@ -475,28 +519,41 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
475519
CHECK_EQ(static_cast<size_t>(err), msg_size);
476520
// + 1 so that the JS side can distinguish 0-length async sends from
477521
// 0-length sync sends.
478-
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
479-
return;
522+
return msg_size + 1;
480523
}
481524
}
482525
}
483526

484527
if (err == 0) {
485-
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
486-
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
487-
req_wrap->msg_size = msg_size;
488-
489-
err = req_wrap->Dispatch(uv_udp_send,
490-
&wrap->handle_,
491-
bufs_ptr,
492-
count,
493-
addr,
494-
OnSend);
528+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(this);
529+
ReqWrap<uv_udp_send_t>* req_wrap = listener()->CreateSendWrap(msg_size);
530+
if (req_wrap == nullptr) return UV_ENOSYS;
531+
532+
err = req_wrap->Dispatch(
533+
uv_udp_send,
534+
&handle_,
535+
bufs_ptr,
536+
count,
537+
addr,
538+
uv_udp_send_cb{[](uv_udp_send_t* req, int status) {
539+
UDPWrap* self = ContainerOf(&UDPWrap::handle_, req->handle);
540+
self->listener()->OnSendDone(
541+
ReqWrap<uv_udp_send_t>::from_req(req), status);
542+
}});
495543
if (err)
496544
delete req_wrap;
497545
}
498546

499-
args.GetReturnValue().Set(err);
547+
return err;
548+
}
549+
550+
551+
ReqWrap<uv_udp_send_t>* UDPWrap::CreateSendWrap(size_t msg_size) {
552+
SendWrap* req_wrap = new SendWrap(env(),
553+
current_send_req_wrap_,
554+
current_send_has_callback_);
555+
req_wrap->msg_size = msg_size;
556+
return req_wrap;
500557
}
501558

502559

@@ -510,16 +567,32 @@ void UDPWrap::Send6(const FunctionCallbackInfo<Value>& args) {
510567
}
511568

512569

570+
AsyncWrap* UDPWrap::GetAsyncWrap() {
571+
return this;
572+
}
573+
574+
int UDPWrap::GetPeerName(sockaddr* name, int* namelen) {
575+
return uv_udp_getpeername(&handle_, name, namelen);
576+
}
577+
578+
int UDPWrap::GetSockName(sockaddr* name, int* namelen) {
579+
return uv_udp_getsockname(&handle_, name, namelen);
580+
}
581+
513582
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) {
514583
UDPWrap* wrap;
515584
ASSIGN_OR_RETURN_UNWRAP(&wrap,
516585
args.Holder(),
517586
args.GetReturnValue().Set(UV_EBADF));
518-
int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv);
587+
args.GetReturnValue().Set(wrap->RecvStart());
588+
}
589+
590+
int UDPWrap::RecvStart() {
591+
int err = uv_udp_recv_start(&handle_, OnAlloc, OnRecv);
519592
// UV_EALREADY means that the socket is already bound but that's okay
520593
if (err == UV_EALREADY)
521594
err = 0;
522-
args.GetReturnValue().Set(err);
595+
return err;
523596
}
524597

525598

@@ -528,13 +601,16 @@ void UDPWrap::RecvStop(const FunctionCallbackInfo<Value>& args) {
528601
ASSIGN_OR_RETURN_UNWRAP(&wrap,
529602
args.Holder(),
530603
args.GetReturnValue().Set(UV_EBADF));
531-
int r = uv_udp_recv_stop(&wrap->handle_);
532-
args.GetReturnValue().Set(r);
604+
args.GetReturnValue().Set(wrap->RecvStop());
605+
}
606+
607+
int UDPWrap::RecvStop() {
608+
return uv_udp_recv_stop(&handle_);
533609
}
534610

535611

536-
void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
537-
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req->data)};
612+
void UDPWrap::OnSendDone(ReqWrap<uv_udp_send_t>* req, int status) {
613+
std::unique_ptr<SendWrap> req_wrap{static_cast<SendWrap*>(req)};
538614
if (req_wrap->have_callback()) {
539615
Environment* env = req_wrap->env();
540616
HandleScope handle_scope(env->isolate());
@@ -551,43 +627,53 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
551627
void UDPWrap::OnAlloc(uv_handle_t* handle,
552628
size_t suggested_size,
553629
uv_buf_t* buf) {
554-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
555-
*buf = wrap->env()->AllocateManaged(suggested_size).release();
630+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_,
631+
reinterpret_cast<uv_udp_t*>(handle));
632+
*buf = wrap->listener()->OnAlloc(suggested_size);
633+
}
634+
635+
uv_buf_t UDPWrap::OnAlloc(size_t suggested_size) {
636+
return env()->AllocateManaged(suggested_size).release();
556637
}
557638

558639
void UDPWrap::OnRecv(uv_udp_t* handle,
559640
ssize_t nread,
560-
const uv_buf_t* buf_,
561-
const struct sockaddr* addr,
641+
const uv_buf_t* buf,
642+
const sockaddr* addr,
562643
unsigned int flags) {
563-
UDPWrap* wrap = static_cast<UDPWrap*>(handle->data);
564-
Environment* env = wrap->env();
644+
UDPWrap* wrap = ContainerOf(&UDPWrap::handle_, handle);
645+
wrap->listener()->OnRecv(nread, *buf, addr, flags);
646+
}
565647

566-
AllocatedBuffer buf(env, *buf_);
648+
void UDPWrap::OnRecv(ssize_t nread,
649+
const uv_buf_t& buf_,
650+
const sockaddr* addr,
651+
unsigned int flags) {
652+
Environment* env = this->env();
653+
AllocatedBuffer buf(env, buf_);
567654
if (nread == 0 && addr == nullptr) {
568655
return;
569656
}
570657

571658
HandleScope handle_scope(env->isolate());
572659
Context::Scope context_scope(env->context());
573660

574-
Local<Object> wrap_obj = wrap->object();
575661
Local<Value> argv[] = {
576662
Integer::New(env->isolate(), nread),
577-
wrap_obj,
663+
object(),
578664
Undefined(env->isolate()),
579665
Undefined(env->isolate())
580666
};
581667

582668
if (nread < 0) {
583-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
669+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
584670
return;
585671
}
586672

587673
buf.Resize(nread);
588674
argv[2] = buf.ToBuffer().ToLocalChecked();
589675
argv[3] = AddressToJS(env, addr);
590-
wrap->MakeCallback(env->onmessage_string(), arraysize(argv), argv);
676+
MakeCallback(env->onmessage_string(), arraysize(argv), argv);
591677
}
592678

593679
MaybeLocal<Object> UDPWrap::Instantiate(Environment* env,

src/udp_wrap.h

+68-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,52 @@
3232

3333
namespace node {
3434

35-
class UDPWrap: public HandleWrap {
35+
class UDPWrapBase;
36+
37+
class UDPListener {
38+
public:
39+
virtual ~UDPListener();
40+
virtual uv_buf_t OnAlloc(size_t suggested_size) = 0;
41+
virtual void OnRecv(ssize_t nread,
42+
const uv_buf_t& buf,
43+
const sockaddr* addr,
44+
unsigned int flags) = 0;
45+
virtual ReqWrap<uv_udp_send_t>* CreateSendWrap(size_t msg_size) = 0;
46+
virtual void OnSendDone(ReqWrap<uv_udp_send_t>* wrap, int status) = 0;
47+
virtual void OnAfterBind() {}
48+
49+
inline UDPWrapBase* udp() const { return wrap_; }
50+
51+
protected:
52+
UDPWrapBase* wrap_ = nullptr;
53+
54+
friend class UDPWrapBase;
55+
};
56+
57+
class UDPWrapBase {
58+
public:
59+
static constexpr int kUDPWrapBaseField = 1;
60+
61+
virtual ~UDPWrapBase();
62+
virtual int RecvStart() = 0;
63+
virtual int RecvStop() = 0;
64+
virtual ssize_t Send(uv_buf_t* bufs,
65+
size_t nbufs,
66+
const sockaddr* addr) = 0;
67+
virtual int GetPeerName(sockaddr* name, int* namelen) = 0;
68+
virtual int GetSockName(sockaddr* name, int* namelen) = 0;
69+
virtual AsyncWrap* GetAsyncWrap() = 0;
70+
71+
void set_listener(UDPListener* listener);
72+
UDPListener* listener() const;
73+
74+
private:
75+
UDPListener* listener_ = nullptr;
76+
};
77+
78+
class UDPWrap final : public HandleWrap,
79+
public UDPWrapBase,
80+
public UDPListener {
3681
public:
3782
enum SocketType {
3883
SOCKET
@@ -64,6 +109,25 @@ class UDPWrap: public HandleWrap {
64109
static void SetTTL(const v8::FunctionCallbackInfo<v8::Value>& args);
65110
static void BufferSize(const v8::FunctionCallbackInfo<v8::Value>& args);
66111

112+
// UDPListener implementation
113+
uv_buf_t OnAlloc(size_t suggested_size) override;
114+
void OnRecv(ssize_t nread,
115+
const uv_buf_t& buf,
116+
const sockaddr* addr,
117+
unsigned int flags);
118+
ReqWrap<uv_udp_send_t>* CreateSendWrap(size_t msg_size) override;
119+
void OnSendDone(ReqWrap<uv_udp_send_t>* wrap, int status) override;
120+
121+
// UDPWrapBase implementation
122+
int RecvStart() override;
123+
int RecvStop() override;
124+
ssize_t Send(uv_buf_t* bufs,
125+
size_t nbufs,
126+
const sockaddr* addr) override;
127+
int GetPeerName(sockaddr* name, int* namelen) override;
128+
int GetSockName(sockaddr* name, int* namelen) override;
129+
AsyncWrap* GetAsyncWrap() override;
130+
67131
static v8::MaybeLocal<v8::Object> Instantiate(Environment* env,
68132
AsyncWrap* parent,
69133
SocketType type);
@@ -92,14 +156,16 @@ class UDPWrap: public HandleWrap {
92156
static void OnAlloc(uv_handle_t* handle,
93157
size_t suggested_size,
94158
uv_buf_t* buf);
95-
static void OnSend(uv_udp_send_t* req, int status);
96159
static void OnRecv(uv_udp_t* handle,
97160
ssize_t nread,
98161
const uv_buf_t* buf,
99162
const struct sockaddr* addr,
100163
unsigned int flags);
101164

102165
uv_udp_t handle_;
166+
167+
bool current_send_has_callback_;
168+
v8::Local<v8::Object> current_send_req_wrap_;
103169
};
104170

105171
} // namespace node

0 commit comments

Comments
 (0)