Skip to content

Commit 8024ffb

Browse files
addaleaxdanielleadams
authored andcommitted
worker: add ports property to MessageEvents
Add `ev.ports` for spec compliancy. Since we only emit the raw `data` value, and only create the `MessageEvent` instance if there are EventTarget-style listeners, we store the ports list temporarily on the MessagePort object itself, so that we can look it up when we need to create the event object. Fixes: #37358 PR-URL: #37538 Reviewed-By: Michaël Zasso <targos@protonmail.com> Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
1 parent f4fd3fb commit 8024ffb

File tree

6 files changed

+86
-20
lines changed

6 files changed

+86
-20
lines changed

lib/internal/per_context/messageport.js

+12-4
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,30 @@ const {
44
} = primordials;
55

66
class MessageEvent {
7-
constructor(data, target, type) {
7+
constructor(data, target, type, ports) {
88
this.data = data;
99
this.target = target;
1010
this.type = type;
11+
this.ports = ports ?? [];
1112
}
1213
}
1314

1415
const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
16+
const kCurrentlyReceivingPorts =
17+
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');
1518

16-
exports.emitMessage = function(data, type) {
19+
exports.emitMessage = function(data, ports, type) {
1720
if (typeof this[kHybridDispatch] === 'function') {
18-
this[kHybridDispatch](data, type, undefined);
21+
this[kCurrentlyReceivingPorts] = ports;
22+
try {
23+
this[kHybridDispatch](data, type, undefined);
24+
} finally {
25+
this[kCurrentlyReceivingPorts] = undefined;
26+
}
1927
return;
2028
}
2129

22-
const event = new MessageEvent(data, this, type);
30+
const event = new MessageEvent(data, this, type, ports);
2331
if (type === 'message') {
2432
if (typeof this.onmessage === 'function')
2533
this.onmessage(event);

lib/internal/worker/io.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const {
1515
ObjectSetPrototypeOf,
1616
ReflectApply,
1717
Symbol,
18+
SymbolFor,
1819
} = primordials;
1920

2021
const {
@@ -70,6 +71,8 @@ const kWritableCallbacks = Symbol('kWritableCallbacks');
7071
const kSource = Symbol('kSource');
7172
const kStartedReading = Symbol('kStartedReading');
7273
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
74+
const kCurrentlyReceivingPorts =
75+
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');
7376

7477
const messageTypes = {
7578
UP_AND_RUNNING: 'upAndRunning',
@@ -150,7 +153,9 @@ ObjectDefineProperty(
150153
if (type !== 'message' && type !== 'messageerror') {
151154
return ReflectApply(originalCreateEvent, this, arguments);
152155
}
153-
return new MessageEvent(type, { data });
156+
const ports = this[kCurrentlyReceivingPorts];
157+
this[kCurrentlyReceivingPorts] = undefined;
158+
return new MessageEvent(type, { data, ports });
154159
},
155160
configurable: false,
156161
writable: false,
@@ -161,6 +166,7 @@ ObjectDefineProperty(
161166
function oninit() {
162167
initNodeEventTarget(this);
163168
setupPortReferencing(this, this, 'message');
169+
this[kCurrentlyReceivingPorts] = undefined;
164170
}
165171

166172
defineEventHandler(MessagePort.prototype, 'message');

src/node_messaging.cc

+36-8
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,18 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
126126
} // anonymous namespace
127127

128128
MaybeLocal<Value> Message::Deserialize(Environment* env,
129-
Local<Context> context) {
129+
Local<Context> context,
130+
Local<Value>* port_list) {
131+
Context::Scope context_scope(context);
132+
130133
CHECK(!IsCloseMessage());
134+
if (port_list != nullptr && !transferables_.empty()) {
135+
// Need to create this outside of the EscapableHandleScope, but inside
136+
// the Context::Scope.
137+
*port_list = Array::New(env->isolate());
138+
}
131139

132140
EscapableHandleScope handle_scope(env->isolate());
133-
Context::Scope context_scope(context);
134141

135142
// Create all necessary objects for transferables, e.g. MessagePort handles.
136143
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
@@ -146,10 +153,27 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
146153
});
147154

148155
for (uint32_t i = 0; i < transferables_.size(); ++i) {
156+
HandleScope handle_scope(env->isolate());
149157
TransferData* data = transferables_[i].get();
150158
host_objects[i] = data->Deserialize(
151159
env, context, std::move(transferables_[i]));
152160
if (!host_objects[i]) return {};
161+
if (port_list != nullptr) {
162+
// If we gather a list of all message ports, and this transferred object
163+
// is a message port, add it to that list. This is a bit of an odd case
164+
// of special handling for MessagePorts (as opposed to applying to all
165+
// transferables), but it's required for spec compliancy.
166+
DCHECK((*port_list)->IsArray());
167+
Local<Array> port_list_array = port_list->As<Array>();
168+
Local<Object> obj = host_objects[i]->object();
169+
if (env->message_port_constructor_template()->HasInstance(obj)) {
170+
if (port_list_array->Set(context,
171+
port_list_array->Length(),
172+
obj).IsNothing()) {
173+
return {};
174+
}
175+
}
176+
}
153177
}
154178
transferables_.clear();
155179

@@ -664,7 +688,8 @@ MessagePort* MessagePort::New(
664688
}
665689

666690
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
667-
MessageProcessingMode mode) {
691+
MessageProcessingMode mode,
692+
Local<Value>* port_list) {
668693
std::shared_ptr<Message> received;
669694
{
670695
// Get the head of the message queue.
@@ -696,7 +721,7 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
696721

697722
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
698723

699-
return received->Deserialize(env(), context);
724+
return received->Deserialize(env(), context, port_list);
700725
}
701726

702727
void MessagePort::OnMessage(MessageProcessingMode mode) {
@@ -735,14 +760,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
735760
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
736761

737762
Local<Value> payload;
763+
Local<Value> port_list = Undefined(env()->isolate());
738764
Local<Value> message_error;
739-
Local<Value> argv[2];
765+
Local<Value> argv[3];
740766

741767
{
742768
// Catch any exceptions from parsing the message itself (not from
743769
// emitting it) as 'messageeror' events.
744770
TryCatchScope try_catch(env());
745-
if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
771+
if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
746772
if (try_catch.HasCaught() && !try_catch.HasTerminated())
747773
message_error = try_catch.Exception();
748774
goto reschedule;
@@ -757,13 +783,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
757783
}
758784

759785
argv[0] = payload;
760-
argv[1] = env()->message_string();
786+
argv[1] = port_list;
787+
argv[2] = env()->message_string();
761788

762789
if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
763790
reschedule:
764791
if (!message_error.IsEmpty()) {
765792
argv[0] = message_error;
766-
argv[1] = env()->messageerror_string();
793+
argv[1] = Undefined(env()->isolate());
794+
argv[2] = env()->messageerror_string();
767795
USE(MakeCallback(emit_message, arraysize(argv), argv));
768796
}
769797

src/node_messaging.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ class Message : public MemoryRetainer {
6262

6363
// Deserialize the contained JS value. May only be called once, and only
6464
// after Serialize() has been called (e.g. by another thread).
65-
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
66-
v8::Local<v8::Context> context);
65+
v8::MaybeLocal<v8::Value> Deserialize(
66+
Environment* env,
67+
v8::Local<v8::Context> context,
68+
v8::Local<v8::Value>* port_list = nullptr);
6769

6870
// Serialize a JS value, and optionally transfer objects, into this message.
6971
// The Message object retains ownership of all transferred objects until
@@ -293,8 +295,10 @@ class MessagePort : public HandleWrap {
293295
void OnClose() override;
294296
void OnMessage(MessageProcessingMode mode);
295297
void TriggerAsync();
296-
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
297-
MessageProcessingMode mode);
298+
v8::MaybeLocal<v8::Value> ReceiveMessage(
299+
v8::Local<v8::Context> context,
300+
MessageProcessingMode mode,
301+
v8::Local<v8::Value>* port_list = nullptr);
298302

299303
std::unique_ptr<MessagePortData> data_ = nullptr;
300304
bool receiving_messages_ = false;

test/parallel/test-worker-message-port-move.js

+9-3
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,13 @@ vm.runInContext('(' + function() {
3434

3535
assert(!(port instanceof MessagePort));
3636
assert.strictEqual(port.onmessage, undefined);
37-
port.onmessage = function({ data }) {
37+
port.onmessage = function({ data, ports }) {
3838
assert(data instanceof Object);
39-
port.postMessage(data);
39+
assert(ports instanceof Array);
40+
assert.strictEqual(ports.length, 1);
41+
assert.strictEqual(ports[0], data.p);
42+
assert(!(data.p instanceof MessagePort));
43+
port.postMessage({});
4044
};
4145
port.start();
4246
}
@@ -55,8 +59,10 @@ vm.runInContext('(' + function() {
5559
}
5660
} + ')()', context);
5761

62+
const otherChannel = new MessageChannel();
5863
port2.on('message', common.mustCall((msg) => {
5964
assert(msg instanceof Object);
6065
port2.close();
66+
otherChannel.port2.close();
6167
}));
62-
port2.postMessage({});
68+
port2.postMessage({ p: otherChannel.port1 }, [ otherChannel.port1 ]);

test/parallel/test-worker-message-port.js

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const { MessageChannel, MessagePort } = require('worker_threads');
3434
port1.onmessage = common.mustCall((message) => {
3535
assert.strictEqual(message.data, 4);
3636
assert.strictEqual(message.target, port1);
37+
assert.deepStrictEqual(message.ports, []);
3738
port2.close(common.mustCall());
3839
});
3940

@@ -161,6 +162,19 @@ const { MessageChannel, MessagePort } = require('worker_threads');
161162
port1.close();
162163
}
163164

165+
{
166+
// Test MessageEvent#ports
167+
const c1 = new MessageChannel();
168+
const c2 = new MessageChannel();
169+
c1.port1.postMessage({ port: c2.port2 }, [ c2.port2 ]);
170+
c1.port2.addEventListener('message', common.mustCall((ev) => {
171+
assert.strictEqual(ev.ports.length, 1);
172+
assert.strictEqual(ev.ports[0].constructor, MessagePort);
173+
c1.port1.close();
174+
c2.port1.close();
175+
}));
176+
}
177+
164178
{
165179
assert.deepStrictEqual(
166180
Object.getOwnPropertyNames(MessagePort.prototype).sort(),

0 commit comments

Comments
 (0)