Skip to content

Commit 4314dbf

Browse files
joyeecheungtargos
authored andcommitted
worker: create per-Environment message port after bootstrap
PR-URL: #26593 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 57d302b commit 4314dbf

File tree

5 files changed

+45
-38
lines changed

5 files changed

+45
-38
lines changed

lib/internal/bootstrap/node.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ if (isMainThread) {
146146
setupProcessStdio(getStdout, getStdin, getStderr);
147147
} else {
148148
const { getStdout, getStdin, getStderr } =
149-
workerThreadSetup.initializeWorkerStdio();
149+
workerThreadSetup.createStdioGetters();
150150
setupProcessStdio(getStdout, getStdin, getStderr);
151151
}
152152

lib/internal/process/worker_thread_only.js

+11-18
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,25 @@
22

33
// This file contains process bootstrappers that can only be
44
// run in the worker thread.
5-
const {
6-
getEnvMessagePort
7-
} = internalBinding('worker');
85

96
const {
10-
kWaitingStreams,
11-
ReadableWorkerStdio,
12-
WritableWorkerStdio
7+
createWorkerStdio
138
} = require('internal/worker/io');
149

1510
const {
1611
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
1712
} = require('internal/errors');
18-
const workerStdio = {};
19-
20-
function initializeWorkerStdio() {
21-
const port = getEnvMessagePort();
22-
port[kWaitingStreams] = 0;
23-
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
24-
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
25-
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
2613

14+
let workerStdio;
15+
function lazyWorkerStdio() {
16+
if (!workerStdio) workerStdio = createWorkerStdio();
17+
return workerStdio;
18+
}
19+
function createStdioGetters() {
2720
return {
28-
getStdout() { return workerStdio.stdout; },
29-
getStderr() { return workerStdio.stderr; },
30-
getStdin() { return workerStdio.stdin; }
21+
getStdout() { return lazyWorkerStdio().stdout; },
22+
getStderr() { return lazyWorkerStdio().stderr; },
23+
getStdin() { return lazyWorkerStdio().stdin; }
3124
};
3225
}
3326

@@ -55,7 +48,7 @@ function unavailable(name) {
5548
}
5649

5750
module.exports = {
58-
initializeWorkerStdio,
51+
createStdioGetters,
5952
unavailable,
6053
wrapProcessMethods
6154
};

lib/internal/worker/io.js

+16-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ const {
1111
moveMessagePortToContext,
1212
stopMessagePort
1313
} = internalBinding('messaging');
14-
const { threadId } = internalBinding('worker');
14+
const {
15+
threadId,
16+
getEnvMessagePort
17+
} = internalBinding('worker');
1518

1619
const { Readable, Writable } = require('stream');
1720
const EventEmitter = require('events');
@@ -227,6 +230,16 @@ class WritableWorkerStdio extends Writable {
227230
}
228231
}
229232

233+
function createWorkerStdio() {
234+
const port = getEnvMessagePort();
235+
port[kWaitingStreams] = 0;
236+
return {
237+
stdin: new ReadableWorkerStdio(port, 'stdin'),
238+
stdout: new WritableWorkerStdio(port, 'stdout'),
239+
stderr: new WritableWorkerStdio(port, 'stderr')
240+
};
241+
}
242+
230243
module.exports = {
231244
drainMessagePort,
232245
messageTypes,
@@ -239,5 +252,6 @@ module.exports = {
239252
MessageChannel,
240253
setupPortReferencing,
241254
ReadableWorkerStdio,
242-
WritableWorkerStdio
255+
WritableWorkerStdio,
256+
createWorkerStdio
243257
};

src/node_worker.cc

+16-16
Original file line numberDiff line numberDiff line change
@@ -264,22 +264,6 @@ void Worker::Run() {
264264
Debug(this, "Created Environment for worker with id %llu", thread_id_);
265265
if (is_stopped()) return;
266266
{
267-
HandleScope handle_scope(isolate_);
268-
Mutex::ScopedLock lock(mutex_);
269-
// Set up the message channel for receiving messages in the child.
270-
child_port_ = MessagePort::New(env_.get(),
271-
env_->context(),
272-
std::move(child_port_data_));
273-
// MessagePort::New() may return nullptr if execution is terminated
274-
// within it.
275-
if (child_port_ != nullptr)
276-
env_->set_message_port(child_port_->object(isolate_));
277-
278-
Debug(this, "Created message port for worker %llu", thread_id_);
279-
}
280-
281-
if (is_stopped()) return;
282-
{
283267
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
284268
StartWorkerInspector(env_.get(),
285269
std::move(inspector_parent_handle_),
@@ -291,6 +275,9 @@ void Worker::Run() {
291275
AsyncCallbackScope callback_scope(env_.get());
292276
env_->async_hooks()->push_async_ids(1, 0);
293277
if (!RunBootstrapping(env_.get()).IsEmpty()) {
278+
CreateEnvMessagePort(env_.get());
279+
if (is_stopped()) return;
280+
Debug(this, "Created message port for worker %llu", thread_id_);
294281
USE(StartExecution(env_.get(), "internal/main/worker_thread"));
295282
}
296283

@@ -343,6 +330,19 @@ void Worker::Run() {
343330
Debug(this, "Worker %llu thread stops", thread_id_);
344331
}
345332

333+
void Worker::CreateEnvMessagePort(Environment* env) {
334+
HandleScope handle_scope(isolate_);
335+
Mutex::ScopedLock lock(mutex_);
336+
// Set up the message channel for receiving messages in the child.
337+
child_port_ = MessagePort::New(env,
338+
env->context(),
339+
std::move(child_port_data_));
340+
// MessagePort::New() may return nullptr if execution is terminated
341+
// within it.
342+
if (child_port_ != nullptr)
343+
env->set_message_port(child_port_->object(isolate_));
344+
}
345+
346346
void Worker::JoinThread() {
347347
if (thread_joined_)
348348
return;

src/node_worker.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class Worker : public AsyncWrap {
5050

5151
private:
5252
void OnThreadStopped();
53-
53+
void CreateEnvMessagePort(Environment* env);
5454
const std::string url_;
5555

5656
std::shared_ptr<PerIsolateOptions> per_isolate_opts_;

0 commit comments

Comments
 (0)