Skip to content

Commit a9fb51f

Browse files
committed
src: align worker and main thread code with embedder API
This addresses some long-standing TODOs by Joyee and me about making the embedder API more powerful and us less reliant on internal APIs for creating the main thread and Workers. PR-URL: #30467 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
1 parent 084c379 commit a9fb51f

12 files changed

+232
-117
lines changed

src/api/environment.cc

+82-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
#include "node_v8_platform-inl.h"
88
#include "uv.h"
99

10+
#if HAVE_INSPECTOR
11+
#include "inspector/worker_inspector.h" // ParentInspectorHandle
12+
#endif
13+
1014
namespace node {
1115
using errors::TryCatchScope;
1216
using v8::Array;
@@ -332,26 +336,40 @@ Environment* CreateEnvironment(IsolateData* isolate_data,
332336
const char* const* argv,
333337
int exec_argc,
334338
const char* const* exec_argv) {
339+
return CreateEnvironment(
340+
isolate_data, context,
341+
std::vector<std::string>(argv, argv + argc),
342+
std::vector<std::string>(exec_argv, exec_argv + exec_argc));
343+
}
344+
345+
Environment* CreateEnvironment(
346+
IsolateData* isolate_data,
347+
Local<Context> context,
348+
const std::vector<std::string>& args,
349+
const std::vector<std::string>& exec_args,
350+
EnvironmentFlags::Flags flags,
351+
ThreadId thread_id) {
335352
Isolate* isolate = context->GetIsolate();
336353
HandleScope handle_scope(isolate);
337354
Context::Scope context_scope(context);
338355
// TODO(addaleax): This is a much better place for parsing per-Environment
339356
// options than the global parse call.
340-
std::vector<std::string> args(argv, argv + argc);
341-
std::vector<std::string> exec_args(exec_argv, exec_argv + exec_argc);
342-
// TODO(addaleax): Provide more sensible flags, in an embedder-accessible way.
343357
Environment* env = new Environment(
344358
isolate_data,
345359
context,
346360
args,
347361
exec_args,
348-
static_cast<Environment::Flags>(Environment::kOwnsProcessState |
349-
Environment::kOwnsInspector));
350-
env->InitializeLibuv(per_process::v8_is_profiling);
362+
flags,
363+
thread_id);
364+
if (flags & EnvironmentFlags::kOwnsProcessState) {
365+
env->set_abort_on_uncaught_exception(false);
366+
}
367+
351368
if (env->RunBootstrapping().IsEmpty()) {
352369
FreeEnvironment(env);
353370
return nullptr;
354371
}
372+
355373
return env;
356374
}
357375

@@ -376,6 +394,58 @@ void FreeEnvironment(Environment* env) {
376394
delete env;
377395
}
378396

397+
InspectorParentHandle::~InspectorParentHandle() {}
398+
399+
// Hide the internal handle class from the public API.
400+
#if HAVE_INSPECTOR
401+
struct InspectorParentHandleImpl : public InspectorParentHandle {
402+
std::unique_ptr<inspector::ParentInspectorHandle> impl;
403+
404+
explicit InspectorParentHandleImpl(
405+
std::unique_ptr<inspector::ParentInspectorHandle>&& impl)
406+
: impl(std::move(impl)) {}
407+
};
408+
#endif
409+
410+
NODE_EXTERN std::unique_ptr<InspectorParentHandle> GetInspectorParentHandle(
411+
Environment* env,
412+
ThreadId thread_id,
413+
const char* url) {
414+
CHECK_NOT_NULL(env);
415+
CHECK_NE(thread_id.id, static_cast<uint64_t>(-1));
416+
#if HAVE_INSPECTOR
417+
return std::make_unique<InspectorParentHandleImpl>(
418+
env->inspector_agent()->GetParentHandle(thread_id.id, url));
419+
#else
420+
return {};
421+
#endif
422+
}
423+
424+
void LoadEnvironment(Environment* env) {
425+
USE(LoadEnvironment(env, {}));
426+
}
427+
428+
MaybeLocal<Value> LoadEnvironment(
429+
Environment* env,
430+
std::unique_ptr<InspectorParentHandle> inspector_parent_handle) {
431+
env->InitializeLibuv(per_process::v8_is_profiling);
432+
env->InitializeDiagnostics();
433+
434+
#if HAVE_INSPECTOR
435+
if (inspector_parent_handle) {
436+
env->InitializeInspector(
437+
std::move(static_cast<InspectorParentHandleImpl*>(
438+
inspector_parent_handle.get())->impl));
439+
} else {
440+
env->InitializeInspector({});
441+
}
442+
#endif
443+
444+
// TODO(joyeecheung): Allow embedders to customize the entry
445+
// point more directly without using _third_party_main.js
446+
return StartExecution(env);
447+
}
448+
379449
Environment* GetCurrentEnvironment(Local<Context> context) {
380450
return Environment::GetCurrent(context);
381451
}
@@ -592,4 +662,10 @@ void AddLinkedBinding(Environment* env,
592662
AddLinkedBinding(env, mod);
593663
}
594664

665+
static std::atomic<uint64_t> next_thread_id{0};
666+
667+
ThreadId AllocateEnvironmentThreadId() {
668+
return ThreadId { next_thread_id++ };
669+
}
670+
595671
} // namespace node

src/env-inl.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -820,8 +820,9 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
820820
{
821821
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
822822
native_immediates_threadsafe_.Push(std::move(callback));
823+
if (task_queues_async_initialized_)
824+
uv_async_send(&task_queues_async_);
823825
}
824-
uv_async_send(&task_queues_async_);
825826
}
826827

827828
template <typename Fn>
@@ -831,8 +832,9 @@ void Environment::RequestInterrupt(Fn&& cb) {
831832
{
832833
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
833834
native_immediates_interrupts_.Push(std::move(callback));
835+
if (task_queues_async_initialized_)
836+
uv_async_send(&task_queues_async_);
834837
}
835-
uv_async_send(&task_queues_async_);
836838
RequestInterruptFromV8();
837839
}
838840

@@ -893,11 +895,11 @@ inline bool Environment::is_main_thread() const {
893895
}
894896

895897
inline bool Environment::owns_process_state() const {
896-
return flags_ & kOwnsProcessState;
898+
return flags_ & EnvironmentFlags::kOwnsProcessState;
897899
}
898900

899901
inline bool Environment::owns_inspector() const {
900-
return flags_ & kOwnsInspector;
902+
return flags_ & EnvironmentFlags::kOwnsInspector;
901903
}
902904

903905
bool Environment::filehandle_close_warning() const {
@@ -1226,6 +1228,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
12261228
inline void Environment::RegisterFinalizationGroupForCleanup(
12271229
v8::Local<v8::FinalizationGroup> group) {
12281230
cleanup_finalization_groups_.emplace_back(isolate(), group);
1231+
DCHECK(task_queues_async_initialized_);
12291232
uv_async_send(&task_queues_async_);
12301233
}
12311234

src/env.cc

+34-13
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,6 @@ void TrackingTraceStateObserver::UpdateTraceCategoryState() {
232232
.ToLocalChecked();
233233
}
234234

235-
static std::atomic<uint64_t> next_thread_id{0};
236-
237-
uint64_t Environment::AllocateThreadId() {
238-
return next_thread_id++;
239-
}
240-
241235
void Environment::CreateProperties() {
242236
HandleScope handle_scope(isolate_);
243237
Local<Context> ctx = context();
@@ -294,8 +288,8 @@ Environment::Environment(IsolateData* isolate_data,
294288
Local<Context> context,
295289
const std::vector<std::string>& args,
296290
const std::vector<std::string>& exec_args,
297-
Flags flags,
298-
uint64_t thread_id)
291+
EnvironmentFlags::Flags flags,
292+
ThreadId thread_id)
299293
: isolate_(context->GetIsolate()),
300294
isolate_data_(isolate_data),
301295
immediate_info_(context->GetIsolate()),
@@ -307,14 +301,23 @@ Environment::Environment(IsolateData* isolate_data,
307301
should_abort_on_uncaught_toggle_(isolate_, 1),
308302
stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
309303
flags_(flags),
310-
thread_id_(thread_id == kNoThreadId ? AllocateThreadId() : thread_id),
304+
thread_id_(thread_id.id == static_cast<uint64_t>(-1) ?
305+
AllocateEnvironmentThreadId().id : thread_id.id),
311306
fs_stats_field_array_(isolate_, kFsStatsBufferLength),
312307
fs_stats_field_bigint_array_(isolate_, kFsStatsBufferLength),
313308
context_(context->GetIsolate(), context) {
314309
// We'll be creating new objects so make sure we've entered the context.
315310
HandleScope handle_scope(isolate());
316311
Context::Scope context_scope(context);
317312

313+
// Set some flags if only kDefaultFlags was passed. This can make API version
314+
// transitions easier for embedders.
315+
if (flags_ & EnvironmentFlags::kDefaultFlags) {
316+
flags_ = flags_ |
317+
EnvironmentFlags::kOwnsProcessState |
318+
EnvironmentFlags::kOwnsInspector;
319+
}
320+
318321
set_env_vars(per_process::system_environment);
319322
enabled_debug_list_.Parse(this);
320323

@@ -333,6 +336,10 @@ Environment::Environment(IsolateData* isolate_data,
333336

334337
AssignToContext(context, ContextInfo(""));
335338

339+
static uv_once_t init_once = UV_ONCE_INIT;
340+
uv_once(&init_once, InitThreadLocalOnce);
341+
uv_key_set(&thread_local_env, this);
342+
336343
if (tracing::AgentWriterHandle* writer = GetTracingAgentWriter()) {
337344
trace_state_observer_ = std::make_unique<TrackingTraceStateObserver>(this);
338345
if (TracingController* tracing_controller = writer->GetTracingController())
@@ -389,6 +396,9 @@ Environment::Environment(IsolateData* isolate_data,
389396
Environment::~Environment() {
390397
if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr;
391398

399+
// FreeEnvironment() should have set this.
400+
CHECK(is_stopping());
401+
392402
isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback(
393403
BuildEmbedderGraph, this);
394404

@@ -472,6 +482,15 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
472482
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
473483
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
474484

485+
{
486+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
487+
task_queues_async_initialized_ = true;
488+
if (native_immediates_threadsafe_.size() > 0 ||
489+
native_immediates_interrupts_.size() > 0) {
490+
uv_async_send(&task_queues_async_);
491+
}
492+
}
493+
475494
// Register clean-up cb to be called to clean up the handles
476495
// when the environment is freed, note that they are not cleaned in
477496
// the one environment per process setup, but will be called in
@@ -481,10 +500,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
481500
if (start_profiler_idle_notifier) {
482501
StartProfilerIdleNotifier();
483502
}
484-
485-
static uv_once_t init_once = UV_ONCE_INIT;
486-
uv_once(&init_once, InitThreadLocalOnce);
487-
uv_key_set(&thread_local_env, this);
488503
}
489504

490505
void Environment::ExitEnv() {
@@ -533,6 +548,11 @@ void Environment::RegisterHandleCleanups() {
533548
}
534549

535550
void Environment::CleanupHandles() {
551+
{
552+
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
553+
task_queues_async_initialized_ = false;
554+
}
555+
536556
Isolate::DisallowJavascriptExecutionScope disallow_js(isolate(),
537557
Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
538558

@@ -1101,6 +1121,7 @@ void Environment::CleanupFinalizationGroups() {
11011121
if (try_catch.HasCaught() && !try_catch.HasTerminated())
11021122
errors::TriggerUncaughtException(isolate(), try_catch);
11031123
// Re-schedule the execution of the remainder of the queue.
1124+
CHECK(task_queues_async_initialized_);
11041125
uv_async_send(&task_queues_async_);
11051126
return;
11061127
}

src/env.h

+8-12
Original file line numberDiff line numberDiff line change
@@ -857,12 +857,6 @@ class Environment : public MemoryRetainer {
857857
inline void PushAsyncCallbackScope();
858858
inline void PopAsyncCallbackScope();
859859

860-
enum Flags {
861-
kNoFlags = 0,
862-
kOwnsProcessState = 1 << 1,
863-
kOwnsInspector = 1 << 2,
864-
};
865-
866860
static inline Environment* GetCurrent(v8::Isolate* isolate);
867861
static inline Environment* GetCurrent(v8::Local<v8::Context> context);
868862
static inline Environment* GetCurrent(
@@ -881,8 +875,8 @@ class Environment : public MemoryRetainer {
881875
v8::Local<v8::Context> context,
882876
const std::vector<std::string>& args,
883877
const std::vector<std::string>& exec_args,
884-
Flags flags = Flags(),
885-
uint64_t thread_id = kNoThreadId);
878+
EnvironmentFlags::Flags flags,
879+
ThreadId thread_id);
886880
~Environment() override;
887881

888882
void InitializeLibuv(bool start_profiler_idle_notifier);
@@ -1051,9 +1045,6 @@ class Environment : public MemoryRetainer {
10511045
inline bool has_serialized_options() const;
10521046
inline void set_has_serialized_options(bool has_serialized_options);
10531047

1054-
static uint64_t AllocateThreadId();
1055-
static constexpr uint64_t kNoThreadId = -1;
1056-
10571048
inline bool is_main_thread() const;
10581049
inline bool owns_process_state() const;
10591050
inline bool owns_inspector() const;
@@ -1338,7 +1329,7 @@ class Environment : public MemoryRetainer {
13381329
bool has_serialized_options_ = false;
13391330

13401331
std::atomic_bool can_call_into_js_ { true };
1341-
Flags flags_;
1332+
uint64_t flags_;
13421333
uint64_t thread_id_;
13431334
std::unordered_set<worker::Worker*> sub_worker_contexts_;
13441335

@@ -1440,6 +1431,11 @@ class Environment : public MemoryRetainer {
14401431
Mutex native_immediates_threadsafe_mutex_;
14411432
NativeImmediateQueue native_immediates_threadsafe_;
14421433
NativeImmediateQueue native_immediates_interrupts_;
1434+
// Also guarded by native_immediates_threadsafe_mutex_. This can be used when
1435+
// trying to post tasks from other threads to an Environment, as the libuv
1436+
// handle for the immediate queues (task_queues_async_) may not be initialized
1437+
// yet or already have been destroyed.
1438+
bool task_queues_async_initialized_ = false;
14431439

14441440
void RunAndClearNativeImmediates(bool only_refed = false);
14451441
void RunAndClearInterrupts();

src/node.cc

+7-12
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ MaybeLocal<Value> ExecuteBootstrapper(Environment* env,
198198
int Environment::InitializeInspector(
199199
std::unique_ptr<inspector::ParentInspectorHandle> parent_handle) {
200200
std::string inspector_path;
201+
bool is_main = !parent_handle;
201202
if (parent_handle) {
202-
DCHECK(!is_main_thread());
203203
inspector_path = parent_handle->url();
204204
inspector_agent_->SetParentHandle(std::move(parent_handle));
205205
} else {
@@ -213,7 +213,7 @@ int Environment::InitializeInspector(
213213
inspector_agent_->Start(inspector_path,
214214
options_->debug_options(),
215215
inspector_host_port(),
216-
is_main_thread());
216+
is_main);
217217
if (options_->debug_options().inspector_enabled &&
218218
!inspector_agent_->IsListening()) {
219219
return 12; // Signal internal error
@@ -402,14 +402,18 @@ MaybeLocal<Value> StartExecution(Environment* env, const char* main_script_id) {
402402
ExecuteBootstrapper(env, main_script_id, &parameters, &arguments));
403403
}
404404

405-
MaybeLocal<Value> StartMainThreadExecution(Environment* env) {
405+
MaybeLocal<Value> StartExecution(Environment* env) {
406406
// To allow people to extend Node in different ways, this hook allows
407407
// one to drop a file lib/_third_party_main.js into the build
408408
// directory which will be executed instead of Node's normal loading.
409409
if (NativeModuleEnv::Exists("_third_party_main")) {
410410
return StartExecution(env, "internal/main/run_third_party_main");
411411
}
412412

413+
if (env->worker_context() != nullptr) {
414+
return StartExecution(env, "internal/main/worker_thread");
415+
}
416+
413417
std::string first_argv;
414418
if (env->argv().size() > 1) {
415419
first_argv = env->argv()[1];
@@ -448,15 +452,6 @@ MaybeLocal<Value> StartMainThreadExecution(Environment* env) {
448452
return StartExecution(env, "internal/main/eval_stdin");
449453
}
450454

451-
void LoadEnvironment(Environment* env) {
452-
CHECK(env->is_main_thread());
453-
// TODO(joyeecheung): Not all of the execution modes in
454-
// StartMainThreadExecution() make sense for embedders. Pick the
455-
// useful ones out, and allow embedders to customize the entry
456-
// point more directly without using _third_party_main.js
457-
USE(StartMainThreadExecution(env));
458-
}
459-
460455
#ifdef __POSIX__
461456
typedef void (*sigaction_cb)(int signo, siginfo_t* info, void* ucontext);
462457
#endif

0 commit comments

Comments
 (0)