Skip to content

Commit 6b1926f

Browse files
committed
worker: make MessagePort inherit from EventTarget
Use `NodeEventTarget` to provide a mixed `EventEmitter`/`EventTarget` API interface.
1 parent 37fa701 commit 6b1926f

7 files changed

+115
-67
lines changed
+25-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,33 @@
11
'use strict';
2+
const {
3+
SymbolFor,
4+
} = primordials;
5+
26
class MessageEvent {
3-
constructor(data, target) {
7+
constructor(data, target, type) {
48
this.data = data;
59
this.target = target;
10+
this.type = type;
611
}
712
}
813

9-
exports.emitMessage = function(data) {
10-
if (typeof this.onmessage === 'function')
11-
this.onmessage(new MessageEvent(data, this));
14+
const kMessageEventSymbol = SymbolFor('nodejs.internal.MessageEvent');
15+
16+
exports.emitMessage = function(data, type) {
17+
if (typeof this.dispatchEvent === 'function') {
18+
const MessageEvent = this[kMessageEventSymbol];
19+
const event = new MessageEvent(data, this, type);
20+
this.dispatchEvent(event);
21+
return;
22+
}
23+
24+
const event = new MessageEvent(data, this, type);
25+
if (type === 'message') {
26+
if (typeof this.onmessage === 'function')
27+
this.onmessage(event);
28+
} else {
29+
// eslint-disable-next-line no-lonely-if
30+
if (typeof this.onmessageerror === 'function')
31+
this.onmessageerror(event);
32+
}
1233
};

lib/internal/worker/io.js

+69-46
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const {
88
ObjectGetPrototypeOf,
99
ObjectSetPrototypeOf,
1010
Symbol,
11+
SymbolFor,
1112
} = primordials;
1213

1314
const {
@@ -24,20 +25,23 @@ const {
2425
stopMessagePort
2526
} = internalBinding('messaging');
2627
const {
27-
threadId,
2828
getEnvMessagePort
2929
} = internalBinding('worker');
3030

3131
const { Readable, Writable } = require('stream');
32-
const EventEmitter = require('events');
32+
const {
33+
Event,
34+
NodeEventTarget,
35+
defineEventHandler,
36+
initNodeEventTarget,
37+
kNewListener,
38+
kRemoveListener,
39+
kPreprocessEvent,
40+
} = require('internal/event_target');
3341
const { inspect } = require('internal/util/inspect');
34-
let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
35-
debug = fn;
36-
});
3742

3843
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
3944
const kName = Symbol('kName');
40-
const kOnMessageListener = Symbol('kOnMessageListener');
4145
const kPort = Symbol('kPort');
4246
const kWaitingStreams = Symbol('kWaitingStreams');
4347
const kWritableCallbacks = Symbol('kWritableCallbacks');
@@ -54,55 +58,44 @@ const messageTypes = {
5458
};
5559

5660
// We have to mess with the MessagePort prototype a bit, so that a) we can make
57-
// it inherit from EventEmitter, even though it is a C++ class, and b) we do
61+
// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
5862
// not provide methods that are not present in the Browser and not documented
5963
// on our side (e.g. hasRef).
6064
// Save a copy of the original set of methods as a shallow clone.
6165
const MessagePortPrototype = ObjectCreate(
6266
ObjectGetPrototypeOf(MessagePort.prototype),
6367
ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
6468
// Set up the new inheritance chain.
65-
ObjectSetPrototypeOf(MessagePort, EventEmitter);
66-
ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
69+
ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
70+
ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
6771
// Copy methods that are inherited from HandleWrap, because
6872
// changing the prototype of MessagePort.prototype implicitly removed them.
6973
MessagePort.prototype.ref = MessagePortPrototype.ref;
7074
MessagePort.prototype.unref = MessagePortPrototype.unref;
7175

72-
// A communication channel consisting of a handle (that wraps around an
73-
// uv_async_t) which can receive information from other threads and emits
74-
// .onmessage events, and a function used for sending data to a MessagePort
75-
// in some other thread.
76-
MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
77-
if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
78-
debug(`[${threadId}] received message`, event);
79-
// Emit the deserialized object to userland.
80-
this.emit('message', event.data);
81-
};
82-
83-
// This is for compatibility with the Web's MessagePort API. It makes sense to
84-
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
85-
// `onmessage`, we'll switch over to the Web API model.
86-
ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
87-
enumerable: true,
88-
configurable: true,
89-
get() {
90-
return this[kOnMessageListener];
91-
},
92-
set(value) {
93-
this[kOnMessageListener] = value;
94-
if (typeof value === 'function') {
95-
this.ref();
96-
MessagePortPrototype.start.call(this);
97-
} else {
98-
this.unref();
99-
stopMessagePort(this);
100-
}
76+
class MessageEvent extends Event {
77+
constructor(data, target, type) {
78+
super(type);
79+
this.data = data;
10180
}
102-
});
81+
}
82+
83+
ObjectDefineProperty(
84+
MessagePort.prototype,
85+
SymbolFor('nodejs.internal.MessageEvent'),
86+
{
87+
value: MessageEvent,
88+
configurable: true,
89+
writable: false,
90+
enumerable: false,
91+
});
10392

10493
// This is called from inside the `MessagePort` constructor.
10594
function oninit() {
95+
initNodeEventTarget(this);
96+
// TODO(addaleax): This should be on MessagePort.prototype, but
97+
// defineEventHandler() does not support that.
98+
defineEventHandler(this, 'message');
10699
setupPortReferencing(this, this, 'message');
107100
}
108101

@@ -112,9 +105,21 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
112105
value: oninit
113106
});
114107

108+
MessagePort.prototype[kPreprocessEvent] = function(event) {
109+
if (event.type === 'message' || event.type === 'messageerror')
110+
return event.data;
111+
return event;
112+
};
113+
114+
class MessagePortCloseEvent extends Event {
115+
constructor() {
116+
super('close');
117+
}
118+
}
119+
115120
// This is called after the underlying `uv_async_t` has been closed.
116121
function onclose() {
117-
this.emit('close');
122+
this.dispatchEvent(new MessagePortCloseEvent());
118123
}
119124

120125
ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
@@ -156,18 +161,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
156161
// If there are none or all are removed, unref() the channel so the worker
157162
// can shutdown gracefully.
158163
port.unref();
159-
eventEmitter.on('newListener', (name) => {
160-
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
164+
eventEmitter.on('newListener', function(name) {
165+
if (name === eventName) newListener(eventEmitter.listenerCount(name));
166+
});
167+
eventEmitter.on('removeListener', function(name) {
168+
if (name === eventName) removeListener(eventEmitter.listenerCount(name));
169+
});
170+
const origNewListener = eventEmitter[kNewListener];
171+
eventEmitter[kNewListener] = function(size, type, ...args) {
172+
if (type === eventName) newListener(size - 1);
173+
return origNewListener.call(this, size, type, ...args);
174+
};
175+
const origRemoveListener = eventEmitter[kRemoveListener];
176+
eventEmitter[kRemoveListener] = function(size, type, ...args) {
177+
if (type === eventName) removeListener(size);
178+
return origRemoveListener.call(this, size, type, ...args);
179+
};
180+
181+
function newListener(size) {
182+
if (size === 0) {
161183
port.ref();
162184
MessagePortPrototype.start.call(port);
163185
}
164-
});
165-
eventEmitter.on('removeListener', (name) => {
166-
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
186+
}
187+
188+
function removeListener(size) {
189+
if (size === 0) {
167190
stopMessagePort(port);
168191
port.unref();
169192
}
170-
});
193+
}
171194
}
172195

173196

src/node_messaging.cc

+9-8
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,8 @@ void MessagePort::OnMessage() {
746746

747747
Local<Value> payload;
748748
Local<Value> message_error;
749+
Local<Value> argv[2];
750+
749751
{
750752
// Catch any exceptions from parsing the message itself (not from
751753
// emitting it) as 'messageeror' events.
@@ -764,16 +766,15 @@ void MessagePort::OnMessage() {
764766
continue;
765767
}
766768

767-
if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
769+
argv[0] = payload;
770+
argv[1] = env()->message_string();
771+
772+
if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
768773
reschedule:
769774
if (!message_error.IsEmpty()) {
770-
// This should become a `messageerror` event in the sense of the
771-
// EventTarget API at some point.
772-
Local<Value> argv[] = {
773-
env()->messageerror_string(),
774-
message_error
775-
};
776-
USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
775+
argv[0] = message_error;
776+
argv[1] = env()->messageerror_string();
777+
USE(MakeCallback(emit_message, arraysize(argv), argv));
777778
}
778779

779780
// Re-schedule OnMessage() execution in case of failure.

test/parallel/test-crypto-key-objects-messageport.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ for (const [key, repr] of keys) {
7575

7676
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
7777
// implements EventTarget fully and in a cross-context manner.
78-
port2moved.emit = common.mustCall((name, err) => {
79-
assert.strictEqual(name, 'messageerror');
80-
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
78+
port2moved.onmessageerror = common.mustCall((event) => {
79+
assert.strictEqual(event.data.code,
80+
'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
8181
});
8282

8383
port2moved.start();

test/parallel/test-worker-message-port-inspect-during-init-hook.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ const { MessageChannel } = require('worker_threads');
1010

1111
async_hooks.createHook({
1212
init: common.mustCall((id, type, triggerId, resource) => {
13-
assert.strictEqual(util.inspect(resource),
14-
'MessagePort { active: true, refed: false }');
13+
assert.strictEqual(
14+
util.inspect(resource),
15+
'MessagePort [EventTarget] { active: true, refed: false }');
1516
}, 2)
1617
}).enable();
1718

test/parallel/test-worker-message-port-transfer-filehandle.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ const { once } = require('events');
5656
});
5757
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
5858
// implements EventTarget fully and in a cross-context manner.
59-
port2moved.emit = common.mustCall((name, err) => {
60-
assert.strictEqual(name, 'messageerror');
61-
assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
59+
port2moved.onmessageerror = common.mustCall((event) => {
60+
assert.strictEqual(event.data.code,
61+
'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
6262
});
6363
port2moved.start();
6464

test/parallel/test-worker-message-port.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ const { MessageChannel, MessagePort } = require('worker_threads');
154154
assert.deepStrictEqual(
155155
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
156156
[
157-
'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start',
157+
// TODO(addaleax): This should include onmessage (and eventually
158+
// onmessageerror).
159+
'close', 'constructor', 'postMessage', 'ref', 'start',
158160
'unref'
159161
]);
160162
}

0 commit comments

Comments
 (0)