From 6334542a5f3e1ae7729403f1ae11e2cf31e82d8f Mon Sep 17 00:00:00 2001
From: Anna Henningsen <anna@addaleax.net>
Date: Tue, 1 Dec 2020 21:19:11 +0100
Subject: [PATCH] worker: refactor MessagePort entanglement management

This addresses the `TODO` left on my request in 9e446b3e9. :)

Refs: https://github.com/nodejs/node/pull/36271
---
 src/node_messaging.cc | 58 +++++++++++++++++++++----------------------
 src/node_messaging.h  | 16 ++++++------
 2 files changed, 35 insertions(+), 39 deletions(-)

diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 74f75071429c1e..03c6d5aaa76ebd 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -515,19 +515,8 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
   tracker->TrackField("transferables", transferables_);
 }
 
-// TODO(@jasnell): The name here will be an empty string if the
-// one-to-one MessageChannel is used. In such cases,
-// SiblingGroup::Get() will return nothing and group_ will be
-// an empty pointer. @addaleax suggests that the code here
-// could be clearer if attaching the SiblingGroup were a
-// separate step rather than part of the constructor here.
-MessagePortData::MessagePortData(
-    MessagePort* owner,
-    const std::string& name)
-    : owner_(owner),
-      group_(SiblingGroup::Get(name)) {
-  if (group_)
-    group_->Entangle(this);
+MessagePortData::MessagePortData(MessagePort* owner)
+    : owner_(owner) {
 }
 
 MessagePortData::~MessagePortData() {
@@ -552,17 +541,13 @@ void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
 }
 
 void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
-  CHECK(!a->group_);
-  CHECK(!b->group_);
-  b->group_ = a->group_ = std::make_shared<SiblingGroup>();
-  a->group_->Entangle(a);
-  a->group_->Entangle(b);
+  auto group = std::make_shared<SiblingGroup>();
+  group->Entangle({a, b});
 }
 
 void MessagePortData::Disentangle() {
   if (group_) {
     group_->Disentangle(this);
-    group_.reset();
   }
 }
 
@@ -572,13 +557,12 @@ MessagePort::~MessagePort() {
 
 MessagePort::MessagePort(Environment* env,
                          Local<Context> context,
-                         Local<Object> wrap,
-                         const std::string& name)
+                         Local<Object> wrap)
   : HandleWrap(env,
                wrap,
                reinterpret_cast<uv_handle_t*>(&async_),
                AsyncWrap::PROVIDER_MESSAGEPORT),
-    data_(new MessagePortData(this, name)) {
+    data_(new MessagePortData(this)) {
   auto onmessage = [](uv_async_t* handle) {
     // Called when data has been put into the queue.
     MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
@@ -645,7 +629,7 @@ MessagePort* MessagePort::New(
     Environment* env,
     Local<Context> context,
     std::unique_ptr<MessagePortData> data,
-    const std::string& name) {
+    std::shared_ptr<SiblingGroup> sibling_group) {
   Context::Scope context_scope(context);
   Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
 
@@ -654,7 +638,7 @@ MessagePort* MessagePort::New(
   Local<Object> instance;
   if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
     return nullptr;
-  MessagePort* port = new MessagePort(env, context, instance, name);
+  MessagePort* port = new MessagePort(env, context, instance);
   CHECK_NOT_NULL(port);
   if (port->IsHandleClosing()) {
     // Construction failed with an exception.
@@ -662,6 +646,7 @@ MessagePort* MessagePort::New(
   }
 
   if (data) {
+    CHECK(!sibling_group);
     port->Detach();
     port->data_ = std::move(data);
 
@@ -673,6 +658,8 @@ MessagePort* MessagePort::New(
     // If the existing MessagePortData object had pending messages, this is
     // the easiest way to run that queue.
     port->TriggerAsync();
+  } else if (sibling_group) {
+    sibling_group->Entangle(port->data_.get());
   }
   return port;
 }
@@ -1067,7 +1054,7 @@ void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
 }
 
 void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
-  Entangle(a, b->data_.get());
+  MessagePortData::Entangle(a->data_.get(), b->data_.get());
 }
 
 void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
@@ -1274,7 +1261,6 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
 }
 
 std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
-  if (name.empty()) return {};
   Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
   std::shared_ptr<SiblingGroup> group;
   auto i = groups_.find(name);
@@ -1348,14 +1334,24 @@ Maybe<bool> SiblingGroup::Dispatch(
   return Just(true);
 }
 
-void SiblingGroup::Entangle(MessagePortData* data) {
+void SiblingGroup::Entangle(MessagePortData* port) {
+  Entangle({ port });
+}
+
+void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
   Mutex::ScopedLock lock(group_mutex_);
-  ports_.insert(data);
+  for (MessagePortData* data : ports) {
+    ports_.insert(data);
+    CHECK(!data->group_);
+    data->group_ = shared_from_this();
+  }
 }
 
 void SiblingGroup::Disentangle(MessagePortData* data) {
+  auto self = shared_from_this();  // Keep alive until end of function.
   Mutex::ScopedLock lock(group_mutex_);
   ports_.erase(data);
+  data->group_.reset();
 
   data->AddToIncomingQueue(std::make_shared<Message>());
   // If this is an anonymous group and there's another port, close it.
@@ -1407,8 +1403,10 @@ static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
   Context::Scope context_scope(env->context());
   Utf8Value name(env->isolate(), args[0]);
   MessagePort* port =
-      MessagePort::New(env, env->context(), nullptr, std::string(*name));
-  args.GetReturnValue().Set(port->object());
+      MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
+  if (port != nullptr) {
+    args.GetReturnValue().Set(port->object());
+  }
 }
 
 static void InitMessaging(Local<Object> target,
diff --git a/src/node_messaging.h b/src/node_messaging.h
index 22c11321ef7543..81853a083b533d 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -110,7 +110,7 @@ class Message : public MemoryRetainer {
   friend class MessagePort;
 };
 
-class SiblingGroup {
+class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
  public:
   // Named SiblingGroup, Used for one-to-many BroadcastChannels.
   static std::shared_ptr<SiblingGroup> Get(const std::string& name);
@@ -134,7 +134,7 @@ class SiblingGroup {
       std::string* error = nullptr);
 
   void Entangle(MessagePortData* data);
-
+  void Entangle(std::initializer_list<MessagePortData*> data);
   void Disentangle(MessagePortData* data);
 
   const std::string& name() const { return name_; }
@@ -159,9 +159,7 @@ class SiblingGroup {
 // a specific Environment/Isolate/event loop, for easier transfer between those.
 class MessagePortData : public TransferData {
  public:
-  explicit MessagePortData(
-      MessagePort* owner,
-      const std::string& name = std::string());
+  explicit MessagePortData(MessagePort* owner);
   ~MessagePortData() override;
 
   MessagePortData(MessagePortData&& other) = delete;
@@ -203,6 +201,7 @@ class MessagePortData : public TransferData {
   MessagePort* owner_ = nullptr;
   std::shared_ptr<SiblingGroup> group_;
   friend class MessagePort;
+  friend class SiblingGroup;
 };
 
 // A message port that receives messages from other threads, including
@@ -216,8 +215,7 @@ class MessagePort : public HandleWrap {
   // creating MessagePort instances.
   MessagePort(Environment* env,
               v8::Local<v8::Context> context,
-              v8::Local<v8::Object> wrap,
-              const std::string& name = std::string());
+              v8::Local<v8::Object> wrap);
 
  public:
   ~MessagePort() override;
@@ -226,8 +224,8 @@ class MessagePort : public HandleWrap {
   // `MessagePortData` object.
   static MessagePort* New(Environment* env,
                           v8::Local<v8::Context> context,
-                          std::unique_ptr<MessagePortData> data = nullptr,
-                          const std::string& name = std::string());
+                          std::unique_ptr<MessagePortData> data = {},
+                          std::shared_ptr<SiblingGroup> sibling_group = {});
 
   // Send a message, i.e. deliver it into the sibling's incoming queue.
   // If this port is closed, or if there is no sibling, this message is