Skip to content

Commit f8c45b2

Browse files
addaleaxcodebytere
authored andcommitted
src: remove AsyncRequest
Remove `AsyncRequest` from the source code, and replace its usage with threadsafe `SetImmediate()` calls. This has the advantage of being able to pass in any function, rather than one that is defined when the `AsyncRequest` is “installed”. This necessitates two changes: - The stopping flag (which was only used in one case and ignored in the other) is now a direct member of the `Environment` class. - Workers no longer have their own libuv handles, requiring manual management of their libuv ref count. As a drive-by fix, the `can_call_into_js` variable was turned into an atomic variable. While there have been no bug reports, the flag is set from `Stop(env)` calls, which are supposed to be possible from any thread. PR-URL: #31386 Refs: openjs-foundation/summit#240 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 600e96e commit f8c45b2

File tree

5 files changed

+52
-103
lines changed

5 files changed

+52
-103
lines changed

src/env-inl.h

+14-9
Original file line numberDiff line numberDiff line change
@@ -894,8 +894,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
894894
sub_worker_contexts_.erase(context);
895895
}
896896

897+
inline void Environment::add_refs(int64_t diff) {
898+
task_queues_async_refs_ += diff;
899+
CHECK_GE(task_queues_async_refs_, 0);
900+
if (task_queues_async_refs_ == 0)
901+
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
902+
else
903+
uv_ref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
904+
}
905+
897906
inline bool Environment::is_stopping() const {
898-
return thread_stopper_.is_stopped();
907+
return is_stopping_.load();
908+
}
909+
910+
inline void Environment::set_stopping(bool value) {
911+
is_stopping_.store(value);
899912
}
900913

901914
inline std::list<node_module>* Environment::extra_linked_bindings() {
@@ -1205,14 +1218,6 @@ int64_t Environment::base_object_count() const {
12051218
return base_object_count_;
12061219
}
12071220

1208-
bool AsyncRequest::is_stopped() const {
1209-
return stopped_.load();
1210-
}
1211-
1212-
void AsyncRequest::set_stopped(bool flag) {
1213-
stopped_.store(flag);
1214-
}
1215-
12161221
#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
12171222
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
12181223
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)

src/env.cc

+2-44
Original file line numberDiff line numberDiff line change
@@ -474,14 +474,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
474474
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
475475
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
476476

477-
thread_stopper()->Install(
478-
this, static_cast<void*>(this), [](uv_async_t* handle) {
479-
Environment* env = static_cast<Environment*>(handle->data);
480-
uv_stop(env->event_loop());
481-
});
482-
thread_stopper()->set_stopped(false);
483-
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));
484-
485477
// Register clean-up cb to be called to clean up the handles
486478
// when the environment is freed, note that they are not cleaned in
487479
// the one environment per process setup, but will be called in
@@ -499,8 +491,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
499491

500492
void Environment::ExitEnv() {
501493
set_can_call_into_js(false);
502-
thread_stopper()->Stop();
494+
set_stopping(true);
503495
isolate_->TerminateExecution();
496+
SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); });
504497
}
505498

506499
void Environment::RegisterHandleCleanups() {
@@ -605,7 +598,6 @@ void Environment::RunCleanup() {
605598
started_cleanup_ = true;
606599
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
607600
"RunCleanup", this);
608-
thread_stopper()->Uninstall();
609601
CleanupHandles();
610602

611603
while (!cleanup_hooks_.empty()) {
@@ -1017,7 +1009,6 @@ inline size_t Environment::SelfSize() const {
10171009
// TODO(joyeecheung): refactor the MemoryTracker interface so
10181010
// this can be done for common types within the Track* calls automatically
10191011
// if a certain scope is entered.
1020-
size -= sizeof(thread_stopper_);
10211012
size -= sizeof(async_hooks_);
10221013
size -= sizeof(tick_info_);
10231014
size -= sizeof(immediate_info_);
@@ -1039,7 +1030,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const {
10391030
tracker->TrackField("fs_stats_field_array", fs_stats_field_array_);
10401031
tracker->TrackField("fs_stats_field_bigint_array",
10411032
fs_stats_field_bigint_array_);
1042-
tracker->TrackField("thread_stopper", thread_stopper_);
10431033
tracker->TrackField("cleanup_hooks", cleanup_hooks_);
10441034
tracker->TrackField("async_hooks", async_hooks_);
10451035
tracker->TrackField("immediate_info", immediate_info_);
@@ -1103,38 +1093,6 @@ void Environment::CleanupFinalizationGroups() {
11031093
}
11041094
}
11051095

1106-
void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
1107-
CHECK_NULL(async_);
1108-
env_ = env;
1109-
async_ = new uv_async_t;
1110-
async_->data = data;
1111-
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
1112-
}
1113-
1114-
void AsyncRequest::Uninstall() {
1115-
if (async_ != nullptr) {
1116-
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
1117-
async_ = nullptr;
1118-
}
1119-
}
1120-
1121-
void AsyncRequest::Stop() {
1122-
set_stopped(true);
1123-
if (async_ != nullptr) uv_async_send(async_);
1124-
}
1125-
1126-
uv_async_t* AsyncRequest::GetHandle() {
1127-
return async_;
1128-
}
1129-
1130-
void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
1131-
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
1132-
}
1133-
1134-
AsyncRequest::~AsyncRequest() {
1135-
CHECK_NULL(async_);
1136-
}
1137-
11381096
// Not really any better place than env.cc at this moment.
11391097
void BaseObject::DeleteMe(void* data) {
11401098
BaseObject* self = static_cast<BaseObject*>(data);

src/env.h

+12-35
Original file line numberDiff line numberDiff line change
@@ -587,34 +587,6 @@ struct AllocatedBuffer {
587587
friend class Environment;
588588
};
589589

590-
class AsyncRequest : public MemoryRetainer {
591-
public:
592-
AsyncRequest() = default;
593-
~AsyncRequest() override;
594-
595-
AsyncRequest(const AsyncRequest&) = delete;
596-
AsyncRequest& operator=(const AsyncRequest&) = delete;
597-
AsyncRequest(AsyncRequest&&) = delete;
598-
AsyncRequest& operator=(AsyncRequest&&) = delete;
599-
600-
void Install(Environment* env, void* data, uv_async_cb target);
601-
void Uninstall();
602-
void Stop();
603-
inline void set_stopped(bool flag);
604-
inline bool is_stopped() const;
605-
uv_async_t* GetHandle();
606-
void MemoryInfo(MemoryTracker* tracker) const override;
607-
608-
609-
SET_MEMORY_INFO_NAME(AsyncRequest)
610-
SET_SELF_SIZE(AsyncRequest)
611-
612-
private:
613-
Environment* env_;
614-
uv_async_t* async_ = nullptr;
615-
std::atomic_bool stopped_ {true};
616-
};
617-
618590
class KVStore {
619591
public:
620592
KVStore() = default;
@@ -1062,6 +1034,14 @@ class Environment : public MemoryRetainer {
10621034
inline bool can_call_into_js() const;
10631035
inline void set_can_call_into_js(bool can_call_into_js);
10641036

1037+
// Increase or decrease a counter that manages whether this Environment
1038+
// keeps the event loop alive on its own or not. The counter starts out at 0,
1039+
// meaning it does not, and any positive value will make it keep the event
1040+
// loop alive.
1041+
// This is used by Workers to manage their own .ref()/.unref() implementation,
1042+
// as Workers aren't directly associated with their own libuv handles.
1043+
inline void add_refs(int64_t diff);
1044+
10651045
inline bool has_run_bootstrapping_code() const;
10661046
inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code);
10671047

@@ -1082,6 +1062,7 @@ class Environment : public MemoryRetainer {
10821062
inline void remove_sub_worker_context(worker::Worker* context);
10831063
void stop_sub_worker_contexts();
10841064
inline bool is_stopping() const;
1065+
inline void set_stopping(bool value);
10851066
inline std::list<node_module>* extra_linked_bindings();
10861067
inline node_module* extra_linked_bindings_head();
10871068
inline const Mutex& extra_linked_bindings_mutex() const;
@@ -1223,8 +1204,6 @@ class Environment : public MemoryRetainer {
12231204
inline std::shared_ptr<EnvironmentOptions> options();
12241205
inline std::shared_ptr<HostPort> inspector_host_port();
12251206

1226-
inline AsyncRequest* thread_stopper() { return &thread_stopper_; }
1227-
12281207
// The BaseObject count is a debugging helper that makes sure that there are
12291208
// no memory leaks caused by BaseObjects staying alive longer than expected
12301209
// (in particular, no circular BaseObjectPtr references).
@@ -1285,6 +1264,7 @@ class Environment : public MemoryRetainer {
12851264
uv_prepare_t idle_prepare_handle_;
12861265
uv_check_t idle_check_handle_;
12871266
uv_async_t task_queues_async_;
1267+
int64_t task_queues_async_refs_ = 0;
12881268
bool profiler_idle_notifier_started_ = false;
12891269

12901270
AsyncHooks async_hooks_;
@@ -1342,7 +1322,7 @@ class Environment : public MemoryRetainer {
13421322
bool has_run_bootstrapping_code_ = false;
13431323
bool has_serialized_options_ = false;
13441324

1345-
bool can_call_into_js_ = true;
1325+
std::atomic_bool can_call_into_js_ { true };
13461326
Flags flags_;
13471327
uint64_t thread_id_;
13481328
std::unordered_set<worker::Worker*> sub_worker_contexts_;
@@ -1460,10 +1440,7 @@ class Environment : public MemoryRetainer {
14601440
bool started_cleanup_ = false;
14611441

14621442
int64_t base_object_count_ = 0;
1463-
1464-
// A custom async abstraction (a pair of async handle and a state variable)
1465-
// Used by embedders to shutdown running Node instance.
1466-
AsyncRequest thread_stopper_;
1443+
std::atomic_bool is_stopping_ { false };
14671444

14681445
template <typename T>
14691446
void ForEachBaseObject(T&& iterator);

src/node_worker.cc

+22-12
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ void Worker::Run() {
268268
stopped_ = true;
269269
this->env_ = nullptr;
270270
}
271-
env_->thread_stopper()->set_stopped(true);
271+
env_->set_stopping(true);
272272
env_->stop_sub_worker_contexts();
273273
env_->RunCleanup();
274274
RunAtExit(env_.get());
@@ -412,7 +412,6 @@ void Worker::JoinThread() {
412412
thread_joined_ = true;
413413

414414
env()->remove_sub_worker_context(this);
415-
on_thread_finished_.Uninstall();
416415

417416
{
418417
HandleScope handle_scope(env()->isolate());
@@ -439,6 +438,8 @@ void Worker::JoinThread() {
439438
}
440439

441440
Worker::~Worker() {
441+
JoinThread();
442+
442443
Mutex::ScopedLock lock(mutex_);
443444

444445
CHECK(stopped_);
@@ -574,18 +575,16 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
574575
w->stopped_ = false;
575576
w->thread_joined_ = false;
576577

577-
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
578-
Worker* w_ = static_cast<Worker*>(handle->data);
579-
CHECK(w_->is_stopped());
580-
w_->parent_port_ = nullptr;
581-
w_->JoinThread();
582-
delete w_;
583-
});
578+
if (w->has_ref_)
579+
w->env()->add_refs(1);
584580

585581
uv_thread_options_t thread_options;
586582
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
587583
thread_options.stack_size = kStackSize;
588584
CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
585+
// XXX: This could become a std::unique_ptr, but that makes at least
586+
// gcc 6.3 detect undefined behaviour when there shouldn't be any.
587+
// gcc 7+ handles this well.
589588
Worker* w = static_cast<Worker*>(arg);
590589
const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
591590

@@ -596,7 +595,12 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
596595
w->Run();
597596

598597
Mutex::ScopedLock lock(w->mutex_);
599-
w->on_thread_finished_.Stop();
598+
w->env()->SetImmediateThreadsafe(
599+
[w = std::unique_ptr<Worker>(w)](Environment* env) {
600+
if (w->has_ref_)
601+
env->add_refs(-1);
602+
// implicitly delete w
603+
});
600604
}, static_cast<void*>(w)), 0);
601605
}
602606

@@ -611,13 +615,19 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
611615
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
612616
Worker* w;
613617
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
614-
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
618+
if (!w->has_ref_) {
619+
w->has_ref_ = true;
620+
w->env()->add_refs(1);
621+
}
615622
}
616623

617624
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
618625
Worker* w;
619626
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
620-
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
627+
if (w->has_ref_) {
628+
w->has_ref_ = false;
629+
w->env()->add_refs(-1);
630+
}
621631
}
622632

623633
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {

src/node_worker.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ class Worker : public AsyncWrap {
4141

4242
void MemoryInfo(MemoryTracker* tracker) const override {
4343
tracker->TrackField("parent_port", parent_port_);
44-
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
4544
}
4645

4746
SET_MEMORY_INFO_NAME(Worker)
@@ -107,14 +106,14 @@ class Worker : public AsyncWrap {
107106
// instance refers to it via its [kPort] property.
108107
MessagePort* parent_port_ = nullptr;
109108

110-
AsyncRequest on_thread_finished_;
111-
112109
// A raw flag that is used by creator and worker threads to
113110
// sync up on pre-mature termination of worker - while in the
114111
// warmup phase. Once the worker is fully warmed up, use the
115112
// async handle of the worker's Environment for the same purpose.
116113
bool stopped_ = true;
117114

115+
bool has_ref_ = true;
116+
118117
// The real Environment of the worker object. It has a lesser
119118
// lifespan than the worker object itself - comes to life
120119
// when the worker thread creates a new Environment, and gets

0 commit comments

Comments
 (0)