Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: add ports property to MessageEvents #37538

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions lib/internal/per_context/messageport.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,30 @@ const {
} = primordials;

class MessageEvent {
constructor(data, target, type) {
constructor(data, target, type, ports) {
this.data = data;
this.target = target;
this.type = type;
this.ports = ports ?? [];
}
}

const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
const kCurrentlyReceivingPorts =
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');

exports.emitMessage = function(data, type) {
exports.emitMessage = function(data, ports, type) {
if (typeof this[kHybridDispatch] === 'function') {
this[kHybridDispatch](data, type, undefined);
this[kCurrentlyReceivingPorts] = ports;
try {
this[kHybridDispatch](data, type, undefined);
} finally {
this[kCurrentlyReceivingPorts] = undefined;
}
return;
}

const event = new MessageEvent(data, this, type);
const event = new MessageEvent(data, this, type, ports);
if (type === 'message') {
if (typeof this.onmessage === 'function')
this.onmessage(event);
Expand Down
8 changes: 7 additions & 1 deletion lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const {
ObjectSetPrototypeOf,
ReflectApply,
Symbol,
SymbolFor,
} = primordials;

const {
Expand Down Expand Up @@ -70,6 +71,8 @@ const kWritableCallbacks = Symbol('kWritableCallbacks');
const kSource = Symbol('kSource');
const kStartedReading = Symbol('kStartedReading');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kCurrentlyReceivingPorts =
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');

const messageTypes = {
UP_AND_RUNNING: 'upAndRunning',
Expand Down Expand Up @@ -150,7 +153,9 @@ ObjectDefineProperty(
if (type !== 'message' && type !== 'messageerror') {
return ReflectApply(originalCreateEvent, this, arguments);
}
return new MessageEvent(type, { data });
const ports = this[kCurrentlyReceivingPorts];
this[kCurrentlyReceivingPorts] = undefined;
return new MessageEvent(type, { data, ports });
},
configurable: false,
writable: false,
Expand All @@ -161,6 +166,7 @@ ObjectDefineProperty(
function oninit() {
initNodeEventTarget(this);
setupPortReferencing(this, this, 'message');
this[kCurrentlyReceivingPorts] = undefined;
}

defineEventHandler(MessagePort.prototype, 'message');
Expand Down
44 changes: 36 additions & 8 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,18 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
} // anonymous namespace

MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
Local<Context> context,
Local<Value>* port_list) {
Context::Scope context_scope(context);

CHECK(!IsCloseMessage());
if (port_list != nullptr && !transferables_.empty()) {
// Need to create this outside of the EscapableHandleScope, but inside
// the Context::Scope.
*port_list = Array::New(env->isolate());
}

EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

// Create all necessary objects for transferables, e.g. MessagePort handles.
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
Expand All @@ -146,10 +153,27 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
});

for (uint32_t i = 0; i < transferables_.size(); ++i) {
HandleScope handle_scope(env->isolate());
TransferData* data = transferables_[i].get();
host_objects[i] = data->Deserialize(
env, context, std::move(transferables_[i]));
if (!host_objects[i]) return {};
if (port_list != nullptr) {
// If we gather a list of all message ports, and this transferred object
// is a message port, add it to that list. This is a bit of an odd case
// of special handling for MessagePorts (as opposed to applying to all
// transferables), but it's required for spec compliancy.
DCHECK((*port_list)->IsArray());
Local<Array> port_list_array = port_list->As<Array>();
Local<Object> obj = host_objects[i]->object();
if (env->message_port_constructor_template()->HasInstance(obj)) {
if (port_list_array->Set(context,
port_list_array->Length(),
obj).IsNothing()) {
return {};
}
}
}
}
transferables_.clear();

Expand Down Expand Up @@ -664,7 +688,8 @@ MessagePort* MessagePort::New(
}

MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
MessageProcessingMode mode) {
MessageProcessingMode mode,
Local<Value>* port_list) {
std::shared_ptr<Message> received;
{
// Get the head of the message queue.
Expand Down Expand Up @@ -696,7 +721,7 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,

if (!env()->can_call_into_js()) return MaybeLocal<Value>();

return received->Deserialize(env(), context);
return received->Deserialize(env(), context, port_list);
}

void MessagePort::OnMessage(MessageProcessingMode mode) {
Expand Down Expand Up @@ -735,14 +760,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);

Local<Value> payload;
Local<Value> port_list = Undefined(env()->isolate());
Local<Value> message_error;
Local<Value> argv[2];
Local<Value> argv[3];

{
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
TryCatchScope try_catch(env());
if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
message_error = try_catch.Exception();
goto reschedule;
Expand All @@ -757,13 +783,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
}

argv[0] = payload;
argv[1] = env()->message_string();
argv[1] = port_list;
argv[2] = env()->message_string();

if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
reschedule:
if (!message_error.IsEmpty()) {
argv[0] = message_error;
argv[1] = env()->messageerror_string();
argv[1] = Undefined(env()->isolate());
argv[2] = env()->messageerror_string();
USE(MakeCallback(emit_message, arraysize(argv), argv));
}

Expand Down
12 changes: 8 additions & 4 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class Message : public MemoryRetainer {

// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
v8::Local<v8::Context> context);
v8::MaybeLocal<v8::Value> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value>* port_list = nullptr);

// Serialize a JS value, and optionally transfer objects, into this message.
// The Message object retains ownership of all transferred objects until
Expand Down Expand Up @@ -293,8 +295,10 @@ class MessagePort : public HandleWrap {
void OnClose() override;
void OnMessage(MessageProcessingMode mode);
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
MessageProcessingMode mode);
v8::MaybeLocal<v8::Value> ReceiveMessage(
v8::Local<v8::Context> context,
MessageProcessingMode mode,
v8::Local<v8::Value>* port_list = nullptr);

std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
Expand Down
12 changes: 9 additions & 3 deletions test/parallel/test-worker-message-port-move.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ vm.runInContext('(' + function() {

assert(!(port instanceof MessagePort));
assert.strictEqual(port.onmessage, undefined);
port.onmessage = function({ data }) {
port.onmessage = function({ data, ports }) {
assert(data instanceof Object);
port.postMessage(data);
assert(ports instanceof Array);
assert.strictEqual(ports.length, 1);
assert.strictEqual(ports[0], data.p);
assert(!(data.p instanceof MessagePort));
port.postMessage({});
};
port.start();
}
Expand All @@ -55,8 +59,10 @@ vm.runInContext('(' + function() {
}
} + ')()', context);

const otherChannel = new MessageChannel();
port2.on('message', common.mustCall((msg) => {
assert(msg instanceof Object);
port2.close();
otherChannel.port2.close();
}));
port2.postMessage({});
port2.postMessage({ p: otherChannel.port1 }, [ otherChannel.port1 ]);
14 changes: 14 additions & 0 deletions test/parallel/test-worker-message-port.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const { MessageChannel, MessagePort } = require('worker_threads');
port1.onmessage = common.mustCall((message) => {
assert.strictEqual(message.data, 4);
assert.strictEqual(message.target, port1);
assert.deepStrictEqual(message.ports, []);
port2.close(common.mustCall());
});

Expand Down Expand Up @@ -161,6 +162,19 @@ const { MessageChannel, MessagePort } = require('worker_threads');
port1.close();
}

{
// Test MessageEvent#ports
const c1 = new MessageChannel();
const c2 = new MessageChannel();
c1.port1.postMessage({ port: c2.port2 }, [ c2.port2 ]);
c1.port2.addEventListener('message', common.mustCall((ev) => {
assert.strictEqual(ev.ports.length, 1);
assert.strictEqual(ev.ports[0].constructor, MessagePort);
c1.port1.close();
c2.port1.close();
}));
}

{
assert.deepStrictEqual(
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
Expand Down