Skip to content

Commit 74d11e7

Browse files
gireeshpunathiladdaleax
authored andcommitted
worker: refactor thread life cycle management
The current mechanism of uses two async handles, one owned by the creator of the worker thread to terminate a running worker, and another one employed by the worker to interrupt its creator on its natural termination. The force termination piggybacks on the message- passing mechanism to inform the worker to quiesce. Also there are few flags that represent the other thread's state / request state because certain code path is shared by multiple control flows, and there are certain code path where the async handles may not have come to life. Refactor into an AsyncRequest abstraction that exposes routines to install a handle as well as to save a state. PR-URL: #26099 Refs: #21283 Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 6828fbb commit 74d11e7

File tree

5 files changed

+104
-99
lines changed

5 files changed

+104
-99
lines changed

src/node_messaging.cc

-16
Original file line numberDiff line numberDiff line change
@@ -584,13 +584,6 @@ void MessagePort::OnMessage() {
584584
// Get the head of the message queue.
585585
Mutex::ScopedLock lock(data_->mutex_);
586586

587-
if (stop_event_loop_) {
588-
Debug(this, "MessagePort stops loop as requested");
589-
CHECK(!data_->receiving_messages_);
590-
uv_stop(env()->event_loop());
591-
break;
592-
}
593-
594587
Debug(this, "MessagePort has message, receiving = %d",
595588
static_cast<int>(data_->receiving_messages_));
596589

@@ -740,15 +733,6 @@ void MessagePort::Stop() {
740733
data_->receiving_messages_ = false;
741734
}
742735

743-
void MessagePort::StopEventLoop() {
744-
Mutex::ScopedLock lock(data_->mutex_);
745-
data_->receiving_messages_ = false;
746-
stop_event_loop_ = true;
747-
748-
Debug(this, "Received StopEventLoop request");
749-
TriggerAsync();
750-
}
751-
752736
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
753737
Environment* env = Environment::GetCurrent(args);
754738
MessagePort* port;

src/node_messaging.h

-4
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,6 @@ class MessagePort : public HandleWrap {
159159
void Start();
160160
// Stop processing messages on this port as a receiving end.
161161
void Stop();
162-
// Stop processing messages on this port as a receiving end,
163-
// and stop the event loop that this port is associated with.
164-
void StopEventLoop();
165162

166163
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
167164
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -206,7 +203,6 @@ class MessagePort : public HandleWrap {
206203
inline uv_async_t* async();
207204

208205
std::unique_ptr<MessagePortData> data_ = nullptr;
209-
bool stop_event_loop_ = false;
210206

211207
friend class MessagePortData;
212208
};

src/node_worker.cc

+75-62
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
5858

5959
} // anonymous namespace
6060

61+
void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
62+
Mutex::ScopedLock lock(mutex_);
63+
env_ = env;
64+
async_ = new uv_async_t;
65+
if (data != nullptr) async_->data = data;
66+
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
67+
}
68+
69+
void AsyncRequest::Uninstall() {
70+
Mutex::ScopedLock lock(mutex_);
71+
if (async_ != nullptr)
72+
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
73+
}
74+
75+
void AsyncRequest::Stop() {
76+
Mutex::ScopedLock lock(mutex_);
77+
stop_ = true;
78+
if (async_ != nullptr) uv_async_send(async_);
79+
}
80+
81+
void AsyncRequest::SetStopped(bool flag) {
82+
Mutex::ScopedLock lock(mutex_);
83+
stop_ = flag;
84+
}
85+
86+
bool AsyncRequest::IsStopped() const {
87+
Mutex::ScopedLock lock(mutex_);
88+
return stop_;
89+
}
90+
91+
uv_async_t* AsyncRequest::GetHandle() {
92+
Mutex::ScopedLock lock(mutex_);
93+
return async_;
94+
}
95+
96+
void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
97+
Mutex::ScopedLock lock(mutex_);
98+
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
99+
}
100+
61101
Worker::Worker(Environment* env,
62102
Local<Object> wrap,
63103
const std::string& url,
@@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
98138
}
99139

100140
bool Worker::is_stopped() const {
101-
Mutex::ScopedLock stopped_lock(stopped_mutex_);
102-
return stopped_;
141+
return thread_stopper_.IsStopped();
103142
}
104143

105144
// This class contains data that is only relevant to the child thread itself,
@@ -207,6 +246,8 @@ void Worker::Run() {
207246
Context::Scope context_scope(env_->context());
208247
if (child_port != nullptr)
209248
child_port->Close();
249+
thread_stopper_.Uninstall();
250+
thread_stopper_.SetStopped(true);
210251
env_->stop_sub_worker_contexts();
211252
env_->RunCleanup();
212253
RunAtExit(env_.get());
@@ -215,23 +256,19 @@ void Worker::Run() {
215256
WaitForWorkerInspectorToStop(env_.get());
216257
#endif
217258

218-
{
219-
Mutex::ScopedLock stopped_lock(stopped_mutex_);
220-
stopped_ = true;
221-
}
222-
223259
// This call needs to be made while the `Environment` is still alive
224260
// because we assume that it is available for async tracking in the
225261
// NodePlatform implementation.
226262
platform_->DrainTasks(isolate_);
227263
}
228264
});
229265

266+
if (thread_stopper_.IsStopped()) return;
230267
{
231268
HandleScope handle_scope(isolate_);
232269
Local<Context> context = NewContext(isolate_);
233-
if (is_stopped()) return;
234270

271+
if (thread_stopper_.IsStopped()) return;
235272
CHECK(!context.IsEmpty());
236273
Context::Scope context_scope(context);
237274
{
@@ -253,6 +290,14 @@ void Worker::Run() {
253290
Debug(this, "Created Environment for worker with id %llu", thread_id_);
254291

255292
if (is_stopped()) return;
293+
thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
294+
Environment* env_ = static_cast<Environment*>(handle->data);
295+
uv_stop(env_->event_loop());
296+
});
297+
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
298+
299+
Debug(this, "Created Environment for worker with id %llu", thread_id_);
300+
if (thread_stopper_.IsStopped()) return;
256301
{
257302
HandleScope handle_scope(isolate_);
258303
Mutex::ScopedLock lock(mutex_);
@@ -268,7 +313,7 @@ void Worker::Run() {
268313
Debug(this, "Created message port for worker %llu", thread_id_);
269314
}
270315

271-
if (is_stopped()) return;
316+
if (thread_stopper_.IsStopped()) return;
272317
{
273318
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
274319
StartWorkerInspector(env_.get(),
@@ -289,22 +334,21 @@ void Worker::Run() {
289334
Debug(this, "Loaded environment for worker %llu", thread_id_);
290335
}
291336

292-
if (is_stopped()) return;
337+
if (thread_stopper_.IsStopped()) return;
293338
{
294339
SealHandleScope seal(isolate_);
295340
bool more;
296341
env_->performance_state()->Mark(
297342
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
298343
do {
299-
if (is_stopped()) break;
344+
if (thread_stopper_.IsStopped()) break;
300345
uv_run(&data.loop_, UV_RUN_DEFAULT);
301-
if (is_stopped()) break;
346+
if (thread_stopper_.IsStopped()) break;
302347

303348
platform_->DrainTasks(isolate_);
304349

305350
more = uv_loop_alive(&data.loop_);
306-
if (more && !is_stopped())
307-
continue;
351+
if (more && !thread_stopper_.IsStopped()) continue;
308352

309353
EmitBeforeExit(env_.get());
310354

@@ -319,7 +363,7 @@ void Worker::Run() {
319363

320364
{
321365
int exit_code;
322-
bool stopped = is_stopped();
366+
bool stopped = thread_stopper_.IsStopped();
323367
if (!stopped)
324368
exit_code = EmitExit(env_.get());
325369
Mutex::ScopedLock lock(mutex_);
@@ -341,34 +385,11 @@ void Worker::JoinThread() {
341385
thread_joined_ = true;
342386

343387
env()->remove_sub_worker_context(this);
344-
345-
if (thread_exit_async_) {
346-
env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
347-
delete async;
348-
});
349-
350-
if (scheduled_on_thread_stopped_)
351-
OnThreadStopped();
352-
}
388+
OnThreadStopped();
389+
on_thread_finished_.Uninstall();
353390
}
354391

355392
void Worker::OnThreadStopped() {
356-
{
357-
Mutex::ScopedLock lock(mutex_);
358-
scheduled_on_thread_stopped_ = false;
359-
360-
Debug(this, "Worker %llu thread stopped", thread_id_);
361-
362-
{
363-
Mutex::ScopedLock stopped_lock(stopped_mutex_);
364-
CHECK(stopped_);
365-
}
366-
367-
parent_port_ = nullptr;
368-
}
369-
370-
JoinThread();
371-
372393
{
373394
HandleScope handle_scope(env()->isolate());
374395
Context::Scope context_scope(env()->context());
@@ -391,7 +412,7 @@ Worker::~Worker() {
391412
Mutex::ScopedLock lock(mutex_);
392413
JoinThread();
393414

394-
CHECK(stopped_);
415+
CHECK(thread_stopper_.IsStopped());
395416
CHECK(thread_joined_);
396417

397418
// This has most likely already happened within the worker thread -- this
@@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
480501
Mutex::ScopedLock lock(w->mutex_);
481502

482503
w->env()->add_sub_worker_context(w);
483-
w->stopped_ = false;
484504
w->thread_joined_ = false;
505+
w->thread_stopper_.SetStopped(false);
485506

486-
w->thread_exit_async_.reset(new uv_async_t);
487-
w->thread_exit_async_->data = w;
488-
CHECK_EQ(uv_async_init(w->env()->event_loop(),
489-
w->thread_exit_async_.get(),
490-
[](uv_async_t* handle) {
491-
static_cast<Worker*>(handle->data)->OnThreadStopped();
492-
}), 0);
507+
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
508+
Worker* w_ = static_cast<Worker*>(handle->data);
509+
CHECK(w_->thread_stopper_.IsStopped());
510+
w_->parent_port_ = nullptr;
511+
w_->JoinThread();
512+
});
493513

494514
uv_thread_options_t thread_options;
495515
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
@@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
505525
w->Run();
506526

507527
Mutex::ScopedLock lock(w->mutex_);
508-
CHECK(w->thread_exit_async_);
509-
w->scheduled_on_thread_stopped_ = true;
510-
uv_async_send(w->thread_exit_async_.get());
528+
w->on_thread_finished_.Stop();
511529
}, static_cast<void*>(w)), 0);
512530
}
513531

@@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
523541
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
524542
Worker* w;
525543
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
526-
if (w->thread_exit_async_)
527-
uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
544+
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
528545
}
529546

530547
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
531548
Worker* w;
532549
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
533-
if (w->thread_exit_async_)
534-
uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
550+
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
535551
}
536552

537553
void Worker::Exit(int code) {
538554
Mutex::ScopedLock lock(mutex_);
539-
Mutex::ScopedLock stopped_lock(stopped_mutex_);
540555

541556
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
542-
543-
if (!stopped_) {
544-
stopped_ = true;
557+
if (!thread_stopper_.IsStopped()) {
545558
exit_code_ = code;
546-
if (child_port_ != nullptr)
547-
child_port_->StopEventLoop();
559+
Debug(this, "Received StopEventLoop request");
560+
thread_stopper_.Stop();
548561
if (isolate_ != nullptr)
549562
isolate_->TerminateExecution();
550563
}

src/node_worker.h

+27-15
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,35 @@
33

44
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
55

6-
#include "node_messaging.h"
76
#include <unordered_map>
7+
#include "node_messaging.h"
8+
#include "uv.h"
89

910
namespace node {
1011
namespace worker {
1112

1213
class WorkerThreadData;
1314

15+
class AsyncRequest : public MemoryRetainer {
16+
public:
17+
AsyncRequest() {}
18+
void Install(Environment* env, void* data, uv_async_cb target);
19+
void Uninstall();
20+
void Stop();
21+
void SetStopped(bool flag);
22+
bool IsStopped() const;
23+
uv_async_t* GetHandle();
24+
void MemoryInfo(MemoryTracker* tracker) const override;
25+
SET_MEMORY_INFO_NAME(AsyncRequest)
26+
SET_SELF_SIZE(AsyncRequest)
27+
28+
private:
29+
Environment* env_;
30+
uv_async_t* async_ = nullptr;
31+
mutable Mutex mutex_;
32+
bool stop_ = true;
33+
};
34+
1435
// A worker thread, as represented in its parent thread.
1536
class Worker : public AsyncWrap {
1637
public:
@@ -31,11 +52,9 @@ class Worker : public AsyncWrap {
3152
void JoinThread();
3253

3354
void MemoryInfo(MemoryTracker* tracker) const override {
34-
tracker->TrackFieldWithSize(
35-
"isolate_data", sizeof(IsolateData), "IsolateData");
36-
tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment");
37-
tracker->TrackField("thread_exit_async", *thread_exit_async_);
3855
tracker->TrackField("parent_port", parent_port_);
56+
tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
57+
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
3958
}
4059

4160
SET_MEMORY_INFO_NAME(Worker)
@@ -67,16 +86,6 @@ class Worker : public AsyncWrap {
6786
// This mutex protects access to all variables listed below it.
6887
mutable Mutex mutex_;
6988

70-
// Currently only used for telling the parent thread that the child
71-
// thread exited.
72-
std::unique_ptr<uv_async_t> thread_exit_async_;
73-
bool scheduled_on_thread_stopped_ = false;
74-
75-
// This mutex only protects stopped_. If both locks are acquired, this needs
76-
// to be the latter one.
77-
mutable Mutex stopped_mutex_;
78-
bool stopped_ = true;
79-
8089
bool thread_joined_ = true;
8190
int exit_code_ = 0;
8291
uint64_t thread_id_ = -1;
@@ -96,6 +105,9 @@ class Worker : public AsyncWrap {
96105
// instance refers to it via its [kPort] property.
97106
MessagePort* parent_port_ = nullptr;
98107

108+
AsyncRequest thread_stopper_;
109+
AsyncRequest on_thread_finished_;
110+
99111
friend class WorkerThreadData;
100112
};
101113

0 commit comments

Comments
 (0)