Skip to content

Commit 31f4600

Browse files
addaleaxtargos
authored andcommitted
worker: fix interaction of terminate() with messaging port
When a Worker is terminated, its own handle and the public `MessagePort` are `.ref()`’ed, so that all relevant events, including the `'exit'` events, end up being received. However, this is problematic if messages end up being queued from the Worker between the beginning of the `.terminate()` call and its completion, and there are no `'message'` event handlers present at that time. In that situation, currently the messages would not end up being processed, and since the MessagePort is still `.ref()`’ed, it would keep the event loop alive indefinitely. To fix this: - Make sure that all messages end up being received by `drainMessagePort()`, including cases in which the port had been stopped (i.e. there are no `'message'` listeners) and cases in which we exceed the limit for messages being processed in one batch. - Unref the Worker’s internal ports manually after the Worker has exited. Either of these solutions should be solving this on its own, but I think it makes sense to make sure that both of them happen during cleanup. PR-URL: #37319 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent ddae112 commit 31f4600

File tree

4 files changed

+36
-10
lines changed

4 files changed

+36
-10
lines changed

lib/internal/worker.js

+4
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,10 @@ class Worker extends EventEmitter {
251251
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
252252
drainMessagePort(this[kPublicPort]);
253253
drainMessagePort(this[kPort]);
254+
this.removeAllListeners('message');
255+
this.removeAllListeners('messageerrors');
256+
this[kPublicPort].unref();
257+
this[kPort].unref();
254258
this[kDispose]();
255259
if (customErr) {
256260
debug(`[${threadId}] failing with custom error ${customErr} \

src/node_messaging.cc

+13-8
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ MessagePort::MessagePort(Environment* env,
565565
auto onmessage = [](uv_async_t* handle) {
566566
// Called when data has been put into the queue.
567567
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
568-
channel->OnMessage();
568+
channel->OnMessage(MessageProcessingMode::kNormalOperation);
569569
};
570570

571571
CHECK_EQ(uv_async_init(env->event_loop(),
@@ -664,15 +664,17 @@ MessagePort* MessagePort::New(
664664
}
665665

666666
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
667-
bool only_if_receiving) {
667+
MessageProcessingMode mode) {
668668
std::shared_ptr<Message> received;
669669
{
670670
// Get the head of the message queue.
671671
Mutex::ScopedLock lock(data_->mutex_);
672672

673673
Debug(this, "MessagePort has message");
674674

675-
bool wants_message = receiving_messages_ || !only_if_receiving;
675+
bool wants_message =
676+
receiving_messages_ ||
677+
mode == MessageProcessingMode::kForceReadMessages;
676678
// We have nothing to do if:
677679
// - There are no pending messages
678680
// - We are not intending to receive messages, and the message we would
@@ -697,16 +699,18 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
697699
return received->Deserialize(env(), context);
698700
}
699701

700-
void MessagePort::OnMessage() {
702+
void MessagePort::OnMessage(MessageProcessingMode mode) {
701703
Debug(this, "Running MessagePort::OnMessage()");
702704
HandleScope handle_scope(env()->isolate());
703705
Local<Context> context = object(env()->isolate())->CreationContext();
704706

705707
size_t processing_limit;
706-
{
708+
if (mode == MessageProcessingMode::kNormalOperation) {
707709
Mutex::ScopedLock(data_->mutex_);
708710
processing_limit = std::max(data_->incoming_messages_.size(),
709711
static_cast<size_t>(1000));
712+
} else {
713+
processing_limit = std::numeric_limits<size_t>::max();
710714
}
711715

712716
// data_ can only ever be modified by the owner thread, so no need to lock.
@@ -738,7 +742,7 @@ void MessagePort::OnMessage() {
738742
// Catch any exceptions from parsing the message itself (not from
739743
// emitting it) as 'messageeror' events.
740744
TryCatchScope try_catch(env());
741-
if (!ReceiveMessage(context, true).ToLocal(&payload)) {
745+
if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
742746
if (try_catch.HasCaught() && !try_catch.HasTerminated())
743747
message_error = try_catch.Exception();
744748
goto reschedule;
@@ -999,7 +1003,7 @@ void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
9991003
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
10001004
MessagePort* port;
10011005
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
1002-
port->OnMessage();
1006+
port->OnMessage(MessageProcessingMode::kForceReadMessages);
10031007
}
10041008

10051009
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
@@ -1018,7 +1022,8 @@ void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
10181022
}
10191023

10201024
MaybeLocal<Value> payload =
1021-
port->ReceiveMessage(port->object()->CreationContext(), false);
1025+
port->ReceiveMessage(port->object()->CreationContext(),
1026+
MessageProcessingMode::kForceReadMessages);
10221027
if (!payload.IsEmpty())
10231028
args.GetReturnValue().Set(payload.ToLocalChecked());
10241029
}

src/node_messaging.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -285,11 +285,16 @@ class MessagePort : public HandleWrap {
285285
SET_SELF_SIZE(MessagePort)
286286

287287
private:
288+
enum class MessageProcessingMode {
289+
kNormalOperation,
290+
kForceReadMessages
291+
};
292+
288293
void OnClose() override;
289-
void OnMessage();
294+
void OnMessage(MessageProcessingMode mode);
290295
void TriggerAsync();
291296
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
292-
bool only_if_receiving);
297+
MessageProcessingMode mode);
293298

294299
std::unique_ptr<MessagePortData> data_ = nullptr;
295300
bool receiving_messages_ = false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { Worker } = require('worker_threads');
4+
5+
// The actual test here is that the Worker does not keep the main thread
6+
// running after it has been .terminate()’ed.
7+
8+
const w = new Worker(`
9+
const p = require('worker_threads').parentPort;
10+
while(true) p.postMessage({})`, { eval: true });
11+
w.once('message', () => w.terminate());
12+
w.once('exit', common.mustCall());

0 commit comments

Comments
 (0)