Skip to content

Commit 655c1c9

Browse files
joyeecheungBridgeAR
authored andcommittedJan 17, 2019
process: move worker bootstrap code into worker_thread_only.js
Move worker bootstrap code into worker_thread_only.js from internal/worker.js since they are only run once during bootstrap. PR-URL: #25199 Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 9480e1b commit 655c1c9

File tree

2 files changed

+90
-83
lines changed

2 files changed

+90
-83
lines changed
 

‎lib/internal/process/worker_thread_only.js

+89-8
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,21 @@ const {
77
threadId
88
} = internalBinding('worker');
99

10-
const debug = require('util').debuglog('worker');
11-
1210
const {
11+
messageTypes,
12+
kStdioWantsMoreDataCallback,
1313
kWaitingStreams,
1414
ReadableWorkerStdio,
1515
WritableWorkerStdio
1616
} = require('internal/worker/io');
1717

18-
const {
19-
createMessageHandler,
20-
createWorkerFatalExeception
21-
} = require('internal/worker');
18+
let debuglog;
19+
function debug(...args) {
20+
if (!debuglog) {
21+
debuglog = require('util').debuglog('worker');
22+
}
23+
return debuglog(...args);
24+
}
2225

2326
const workerStdio = {};
2427

@@ -36,12 +39,90 @@ function initializeWorkerStdio() {
3639
};
3740
}
3841

42+
function createMessageHandler(port) {
43+
const publicWorker = require('worker_threads');
44+
45+
return function(message) {
46+
if (message.type === messageTypes.LOAD_SCRIPT) {
47+
const { filename, doEval, workerData, publicPort, hasStdin } = message;
48+
publicWorker.parentPort = publicPort;
49+
publicWorker.workerData = workerData;
50+
51+
if (!hasStdin)
52+
workerStdio.stdin.push(null);
53+
54+
debug(`[${threadId}] starts worker script ${filename} ` +
55+
`(eval = ${eval}) at cwd = ${process.cwd()}`);
56+
port.unref();
57+
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
58+
if (doEval) {
59+
const { evalScript } = require('internal/process/execution');
60+
evalScript('[worker eval]', filename);
61+
} else {
62+
process.argv[1] = filename; // script filename
63+
require('module').runMain();
64+
}
65+
return;
66+
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
67+
const { stream, chunk, encoding } = message;
68+
workerStdio[stream].push(chunk, encoding);
69+
return;
70+
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
71+
const { stream } = message;
72+
workerStdio[stream][kStdioWantsMoreDataCallback]();
73+
return;
74+
}
75+
76+
require('assert').fail(`Unknown worker message type ${message.type}`);
77+
};
78+
}
79+
80+
// XXX(joyeecheung): this has to be returned as an anonymous function
81+
// wrapped in a closure, see the comment of the original
82+
// process._fatalException in lib/internal/process/execution.js
83+
function createWorkerFatalExeception(port) {
84+
const {
85+
fatalException: originalFatalException
86+
} = require('internal/process/execution');
87+
88+
return (error) => {
89+
debug(`[${threadId}] gets fatal exception`);
90+
let caught = false;
91+
try {
92+
caught = originalFatalException.call(this, error);
93+
} catch (e) {
94+
error = e;
95+
}
96+
debug(`[${threadId}] fatal exception caught = ${caught}`);
97+
98+
if (!caught) {
99+
let serialized;
100+
try {
101+
const { serializeError } = require('internal/error-serdes');
102+
serialized = serializeError(error);
103+
} catch {}
104+
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
105+
if (serialized)
106+
port.postMessage({
107+
type: messageTypes.ERROR_MESSAGE,
108+
error: serialized
109+
});
110+
else
111+
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
112+
113+
const { clearAsyncIdStack } = require('internal/async_hooks');
114+
clearAsyncIdStack();
115+
116+
process.exit();
117+
}
118+
};
119+
}
120+
39121
function setup() {
40122
debug(`[${threadId}] is setting up worker child environment`);
41123

42124
const port = getEnvMessagePort();
43-
const publicWorker = require('worker_threads');
44-
port.on('message', createMessageHandler(publicWorker, port, workerStdio));
125+
port.on('message', createMessageHandler(port));
45126
port.start();
46127

47128
return {

‎lib/internal/worker.js

+1-75
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ const {
1010
ERR_WORKER_UNSUPPORTED_EXTENSION,
1111
} = require('internal/errors').codes;
1212
const { validateString } = require('internal/validators');
13-
const { clearAsyncIdStack } = require('internal/async_hooks');
1413

1514
const {
1615
drainMessagePort,
@@ -24,7 +23,7 @@ const {
2423
ReadableWorkerStdio,
2524
WritableWorkerStdio,
2625
} = require('internal/worker/io');
27-
const { serializeError, deserializeError } = require('internal/error-serdes');
26+
const { deserializeError } = require('internal/error-serdes');
2827
const { pathToFileURL } = require('url');
2928

3029
const {
@@ -219,77 +218,6 @@ class Worker extends EventEmitter {
219218
}
220219
}
221220

222-
function createMessageHandler(publicWorker, port, workerStdio) {
223-
return function(message) {
224-
if (message.type === messageTypes.LOAD_SCRIPT) {
225-
const { filename, doEval, workerData, publicPort, hasStdin } = message;
226-
publicWorker.parentPort = publicPort;
227-
publicWorker.workerData = workerData;
228-
229-
if (!hasStdin)
230-
workerStdio.stdin.push(null);
231-
232-
debug(`[${threadId}] starts worker script ${filename} ` +
233-
`(eval = ${eval}) at cwd = ${process.cwd()}`);
234-
port.unref();
235-
port.postMessage({ type: messageTypes.UP_AND_RUNNING });
236-
if (doEval) {
237-
const { evalScript } = require('internal/process/execution');
238-
evalScript('[worker eval]', filename);
239-
} else {
240-
process.argv[1] = filename; // script filename
241-
require('module').runMain();
242-
}
243-
return;
244-
} else if (message.type === messageTypes.STDIO_PAYLOAD) {
245-
const { stream, chunk, encoding } = message;
246-
workerStdio[stream].push(chunk, encoding);
247-
return;
248-
} else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
249-
const { stream } = message;
250-
workerStdio[stream][kStdioWantsMoreDataCallback]();
251-
return;
252-
}
253-
254-
assert.fail(`Unknown worker message type ${message.type}`);
255-
};
256-
}
257-
258-
function createWorkerFatalExeception(port) {
259-
const {
260-
fatalException: originalFatalException
261-
} = require('internal/process/execution');
262-
263-
return function(error) {
264-
debug(`[${threadId}] gets fatal exception`);
265-
let caught = false;
266-
try {
267-
caught = originalFatalException.call(this, error);
268-
} catch (e) {
269-
error = e;
270-
}
271-
debug(`[${threadId}] fatal exception caught = ${caught}`);
272-
273-
if (!caught) {
274-
let serialized;
275-
try {
276-
serialized = serializeError(error);
277-
} catch {}
278-
debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
279-
if (serialized)
280-
port.postMessage({
281-
type: messageTypes.ERROR_MESSAGE,
282-
error: serialized
283-
});
284-
else
285-
port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
286-
clearAsyncIdStack();
287-
288-
process.exit();
289-
}
290-
};
291-
}
292-
293221
function pipeWithoutWarning(source, dest) {
294222
const sourceMaxListeners = source._maxListeners;
295223
const destMaxListeners = dest._maxListeners;
@@ -303,8 +231,6 @@ function pipeWithoutWarning(source, dest) {
303231
}
304232

305233
module.exports = {
306-
createMessageHandler,
307-
createWorkerFatalExeception,
308234
threadId,
309235
Worker,
310236
isMainThread

0 commit comments

Comments
 (0)