Skip to content

Commit 56fe9ba

Browse files
addaleaxtargos
authored andcommitted
worker: refactor MessagePort entanglement management
This addresses the `TODO` left on my request in 9e446b3. :) Refs: #36271 PR-URL: #36345 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 9891811 commit 56fe9ba

File tree

2 files changed

+35
-39
lines changed

2 files changed

+35
-39
lines changed

src/node_messaging.cc

+28-30
Original file line numberDiff line numberDiff line change
@@ -515,19 +515,8 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
515515
tracker->TrackField("transferables", transferables_);
516516
}
517517

518-
// TODO(@jasnell): The name here will be an empty string if the
519-
// one-to-one MessageChannel is used. In such cases,
520-
// SiblingGroup::Get() will return nothing and group_ will be
521-
// an empty pointer. @addaleax suggests that the code here
522-
// could be clearer if attaching the SiblingGroup were a
523-
// separate step rather than part of the constructor here.
524-
MessagePortData::MessagePortData(
525-
MessagePort* owner,
526-
const std::string& name)
527-
: owner_(owner),
528-
group_(SiblingGroup::Get(name)) {
529-
if (group_)
530-
group_->Entangle(this);
518+
MessagePortData::MessagePortData(MessagePort* owner)
519+
: owner_(owner) {
531520
}
532521

533522
MessagePortData::~MessagePortData() {
@@ -552,17 +541,13 @@ void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
552541
}
553542

554543
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
555-
CHECK(!a->group_);
556-
CHECK(!b->group_);
557-
b->group_ = a->group_ = std::make_shared<SiblingGroup>();
558-
a->group_->Entangle(a);
559-
a->group_->Entangle(b);
544+
auto group = std::make_shared<SiblingGroup>();
545+
group->Entangle({a, b});
560546
}
561547

562548
void MessagePortData::Disentangle() {
563549
if (group_) {
564550
group_->Disentangle(this);
565-
group_.reset();
566551
}
567552
}
568553

@@ -572,13 +557,12 @@ MessagePort::~MessagePort() {
572557

573558
MessagePort::MessagePort(Environment* env,
574559
Local<Context> context,
575-
Local<Object> wrap,
576-
const std::string& name)
560+
Local<Object> wrap)
577561
: HandleWrap(env,
578562
wrap,
579563
reinterpret_cast<uv_handle_t*>(&async_),
580564
AsyncWrap::PROVIDER_MESSAGEPORT),
581-
data_(new MessagePortData(this, name)) {
565+
data_(new MessagePortData(this)) {
582566
auto onmessage = [](uv_async_t* handle) {
583567
// Called when data has been put into the queue.
584568
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
@@ -645,7 +629,7 @@ MessagePort* MessagePort::New(
645629
Environment* env,
646630
Local<Context> context,
647631
std::unique_ptr<MessagePortData> data,
648-
const std::string& name) {
632+
std::shared_ptr<SiblingGroup> sibling_group) {
649633
Context::Scope context_scope(context);
650634
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
651635

@@ -654,14 +638,15 @@ MessagePort* MessagePort::New(
654638
Local<Object> instance;
655639
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
656640
return nullptr;
657-
MessagePort* port = new MessagePort(env, context, instance, name);
641+
MessagePort* port = new MessagePort(env, context, instance);
658642
CHECK_NOT_NULL(port);
659643
if (port->IsHandleClosing()) {
660644
// Construction failed with an exception.
661645
return nullptr;
662646
}
663647

664648
if (data) {
649+
CHECK(!sibling_group);
665650
port->Detach();
666651
port->data_ = std::move(data);
667652

@@ -673,6 +658,8 @@ MessagePort* MessagePort::New(
673658
// If the existing MessagePortData object had pending messages, this is
674659
// the easiest way to run that queue.
675660
port->TriggerAsync();
661+
} else if (sibling_group) {
662+
sibling_group->Entangle(port->data_.get());
676663
}
677664
return port;
678665
}
@@ -1067,7 +1054,7 @@ void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
10671054
}
10681055

10691056
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
1070-
Entangle(a, b->data_.get());
1057+
MessagePortData::Entangle(a->data_.get(), b->data_.get());
10711058
}
10721059

10731060
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
@@ -1274,7 +1261,6 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
12741261
}
12751262

12761263
std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
1277-
if (name.empty()) return {};
12781264
Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
12791265
std::shared_ptr<SiblingGroup> group;
12801266
auto i = groups_.find(name);
@@ -1348,14 +1334,24 @@ Maybe<bool> SiblingGroup::Dispatch(
13481334
return Just(true);
13491335
}
13501336

1351-
void SiblingGroup::Entangle(MessagePortData* data) {
1337+
void SiblingGroup::Entangle(MessagePortData* port) {
1338+
Entangle({ port });
1339+
}
1340+
1341+
void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
13521342
Mutex::ScopedLock lock(group_mutex_);
1353-
ports_.insert(data);
1343+
for (MessagePortData* data : ports) {
1344+
ports_.insert(data);
1345+
CHECK(!data->group_);
1346+
data->group_ = shared_from_this();
1347+
}
13541348
}
13551349

13561350
void SiblingGroup::Disentangle(MessagePortData* data) {
1351+
auto self = shared_from_this(); // Keep alive until end of function.
13571352
Mutex::ScopedLock lock(group_mutex_);
13581353
ports_.erase(data);
1354+
data->group_.reset();
13591355

13601356
data->AddToIncomingQueue(std::make_shared<Message>());
13611357
// If this is an anonymous group and there's another port, close it.
@@ -1407,8 +1403,10 @@ static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
14071403
Context::Scope context_scope(env->context());
14081404
Utf8Value name(env->isolate(), args[0]);
14091405
MessagePort* port =
1410-
MessagePort::New(env, env->context(), nullptr, std::string(*name));
1411-
args.GetReturnValue().Set(port->object());
1406+
MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
1407+
if (port != nullptr) {
1408+
args.GetReturnValue().Set(port->object());
1409+
}
14121410
}
14131411

14141412
static void InitMessaging(Local<Object> target,

src/node_messaging.h

+7-9
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class Message : public MemoryRetainer {
110110
friend class MessagePort;
111111
};
112112

113-
class SiblingGroup {
113+
class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
114114
public:
115115
// Named SiblingGroup, Used for one-to-many BroadcastChannels.
116116
static std::shared_ptr<SiblingGroup> Get(const std::string& name);
@@ -134,7 +134,7 @@ class SiblingGroup {
134134
std::string* error = nullptr);
135135

136136
void Entangle(MessagePortData* data);
137-
137+
void Entangle(std::initializer_list<MessagePortData*> data);
138138
void Disentangle(MessagePortData* data);
139139

140140
const std::string& name() const { return name_; }
@@ -159,9 +159,7 @@ class SiblingGroup {
159159
// a specific Environment/Isolate/event loop, for easier transfer between those.
160160
class MessagePortData : public TransferData {
161161
public:
162-
explicit MessagePortData(
163-
MessagePort* owner,
164-
const std::string& name = std::string());
162+
explicit MessagePortData(MessagePort* owner);
165163
~MessagePortData() override;
166164

167165
MessagePortData(MessagePortData&& other) = delete;
@@ -203,6 +201,7 @@ class MessagePortData : public TransferData {
203201
MessagePort* owner_ = nullptr;
204202
std::shared_ptr<SiblingGroup> group_;
205203
friend class MessagePort;
204+
friend class SiblingGroup;
206205
};
207206

208207
// A message port that receives messages from other threads, including
@@ -216,8 +215,7 @@ class MessagePort : public HandleWrap {
216215
// creating MessagePort instances.
217216
MessagePort(Environment* env,
218217
v8::Local<v8::Context> context,
219-
v8::Local<v8::Object> wrap,
220-
const std::string& name = std::string());
218+
v8::Local<v8::Object> wrap);
221219

222220
public:
223221
~MessagePort() override;
@@ -226,8 +224,8 @@ class MessagePort : public HandleWrap {
226224
// `MessagePortData` object.
227225
static MessagePort* New(Environment* env,
228226
v8::Local<v8::Context> context,
229-
std::unique_ptr<MessagePortData> data = nullptr,
230-
const std::string& name = std::string());
227+
std::unique_ptr<MessagePortData> data = {},
228+
std::shared_ptr<SiblingGroup> sibling_group = {});
231229

232230
// Send a message, i.e. deliver it into the sibling's incoming queue.
233231
// If this port is closed, or if there is no sibling, this message is

0 commit comments

Comments
 (0)