Skip to content

Commit 600e96e

Browse files
addaleaxcodebytere
authored andcommitted
src: add a threadsafe variant of SetImmediate()
Add a variant of `SetImmediate()` that can be called from any thread. This allows removing the `AsyncRequest` abstraction and replaces it with a more generic mechanism. PR-URL: #31386 Refs: openjs-foundation/summit#240 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 74a7cdb commit 600e96e

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

src/env-inl.h

+18-1
Original file line numberDiff line numberDiff line change
@@ -745,13 +745,15 @@ Environment::NativeImmediateQueue::Shift() {
745745
if (!head_)
746746
tail_ = nullptr; // The queue is now empty.
747747
}
748+
size_--;
748749
return ret;
749750
}
750751

751752
void Environment::NativeImmediateQueue::Push(
752753
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
753754
NativeImmediateCallback* prev_tail = tail_;
754755

756+
size_++;
755757
tail_ = cb.get();
756758
if (prev_tail != nullptr)
757759
prev_tail->set_next(std::move(cb));
@@ -771,6 +773,10 @@ void Environment::NativeImmediateQueue::ConcatMove(
771773
other.size_ = 0;
772774
}
773775

776+
size_t Environment::NativeImmediateQueue::size() const {
777+
return size_.load();
778+
}
779+
774780
template <typename Fn>
775781
void Environment::CreateImmediate(Fn&& cb, bool ref) {
776782
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
@@ -792,6 +798,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
792798
CreateImmediate(std::move(cb), false);
793799
}
794800

801+
template <typename Fn>
802+
void Environment::SetImmediateThreadsafe(Fn&& cb) {
803+
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
804+
std::move(cb), false);
805+
{
806+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
807+
native_immediates_threadsafe_.Push(std::move(callback));
808+
}
809+
uv_async_send(&task_queues_async_);
810+
}
811+
795812
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
796813
: refed_(refed) {}
797814

@@ -1151,7 +1168,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
11511168
inline void Environment::RegisterFinalizationGroupForCleanup(
11521169
v8::Local<v8::FinalizationGroup> group) {
11531170
cleanup_finalization_groups_.emplace_back(isolate(), group);
1154-
uv_async_send(&cleanup_finalization_groups_async_);
1171+
uv_async_send(&task_queues_async_);
11551172
}
11561173

11571174
size_t CleanupHookCallback::Hash::operator()(

src/env.cc

+15-5
Original file line numberDiff line numberDiff line change
@@ -463,15 +463,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
463463
uv_check_init(event_loop(), &idle_check_handle_);
464464
uv_async_init(
465465
event_loop(),
466-
&cleanup_finalization_groups_async_,
466+
&task_queues_async_,
467467
[](uv_async_t* async) {
468468
Environment* env = ContainerOf(
469-
&Environment::cleanup_finalization_groups_async_, async);
469+
&Environment::task_queues_async_, async);
470470
env->CleanupFinalizationGroups();
471+
env->RunAndClearNativeImmediates();
471472
});
472473
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
473474
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
474-
uv_unref(reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_));
475+
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
475476

476477
thread_stopper()->Install(
477478
this, static_cast<void*>(this), [](uv_async_t* handle) {
@@ -535,7 +536,7 @@ void Environment::RegisterHandleCleanups() {
535536
close_and_finish,
536537
nullptr);
537538
RegisterHandleCleanup(
538-
reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_),
539+
reinterpret_cast<uv_handle_t*>(&task_queues_async_),
539540
close_and_finish,
540541
nullptr);
541542
}
@@ -666,6 +667,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
666667
"RunAndClearNativeImmediates", this);
667668
size_t ref_count = 0;
668669

670+
// It is safe to check .size() first, because there is a causal relationship
671+
// between pushes to the threadsafe and this function being called.
672+
// For the common case, it's worth checking the size first before establishing
673+
// a mutex lock.
674+
if (native_immediates_threadsafe_.size() > 0) {
675+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
676+
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
677+
}
678+
669679
NativeImmediateQueue queue;
670680
queue.ConcatMove(std::move(native_immediates_));
671681

@@ -1087,7 +1097,7 @@ void Environment::CleanupFinalizationGroups() {
10871097
if (try_catch.HasCaught() && !try_catch.HasTerminated())
10881098
errors::TriggerUncaughtException(isolate(), try_catch);
10891099
// Re-schedule the execution of the remainder of the queue.
1090-
uv_async_send(&cleanup_finalization_groups_async_);
1100+
uv_async_send(&task_queues_async_);
10911101
return;
10921102
}
10931103
}

src/env.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -1196,6 +1196,9 @@ class Environment : public MemoryRetainer {
11961196
inline void SetImmediate(Fn&& cb);
11971197
template <typename Fn>
11981198
inline void SetUnrefImmediate(Fn&& cb);
1199+
template <typename Fn>
1200+
// This behaves like SetImmediate() but can be called from any thread.
1201+
inline void SetImmediateThreadsafe(Fn&& cb);
11991202
// This needs to be available for the JS-land setImmediate().
12001203
void ToggleImmediateRef(bool ref);
12011204

@@ -1281,7 +1284,7 @@ class Environment : public MemoryRetainer {
12811284
uv_idle_t immediate_idle_handle_;
12821285
uv_prepare_t idle_prepare_handle_;
12831286
uv_check_t idle_check_handle_;
1284-
uv_async_t cleanup_finalization_groups_async_;
1287+
uv_async_t task_queues_async_;
12851288
bool profiler_idle_notifier_started_ = false;
12861289

12871290
AsyncHooks async_hooks_;
@@ -1433,12 +1436,18 @@ class Environment : public MemoryRetainer {
14331436
// 'other' afterwards.
14341437
inline void ConcatMove(NativeImmediateQueue&& other);
14351438

1439+
// size() is atomic and may be called from any thread.
1440+
inline size_t size() const;
1441+
14361442
private:
1443+
std::atomic<size_t> size_ {0};
14371444
std::unique_ptr<NativeImmediateCallback> head_;
14381445
NativeImmediateCallback* tail_ = nullptr;
14391446
};
14401447

14411448
NativeImmediateQueue native_immediates_;
1449+
Mutex native_immediates_threadsafe_mutex_;
1450+
NativeImmediateQueue native_immediates_threadsafe_;
14421451

14431452
void RunAndClearNativeImmediates(bool only_refed = false);
14441453
static void CheckImmediate(uv_check_t* handle);

0 commit comments

Comments
 (0)