diff --git a/benchmark/worker/bench-eventlooputil.js b/benchmark/worker/bench-eventlooputil.js new file mode 100644 index 00000000000000..2d59f9f19ed563 --- /dev/null +++ b/benchmark/worker/bench-eventlooputil.js @@ -0,0 +1,61 @@ +'use strict'; + +const common = require('../common.js'); +const { Worker, parentPort } = require('worker_threads'); + +if (process.argv[2] === 'idle cats') { + return parentPort.once('message', () => {}); +} + +const bench = common.createBenchmark(main, { + n: [1e6], + method: [ + 'ELU_simple', + 'ELU_passed', + ], +}); + +function main({ method, n }) { + switch (method) { + case 'ELU_simple': + benchELUSimple(n); + break; + case 'ELU_passed': + benchELUPassed(n); + break; + default: + throw new Error(`Unsupported method ${method}`); + } +} + +function benchELUSimple(n) { + const worker = new Worker(__filename, { argv: ['idle cats'] }); + + spinUntilIdle(worker, () => { + bench.start(); + for (let i = 0; i < n; i++) + worker.performance.eventLoopUtilization(); + bench.end(n); + worker.postMessage('bye'); + }); +} + +function benchELUPassed(n) { + const worker = new Worker(__filename, { argv: ['idle cats'] }); + + spinUntilIdle(worker, () => { + let elu = worker.performance.eventLoopUtilization(); + bench.start(); + for (let i = 0; i < n; i++) + elu = worker.performance.eventLoopUtilization(elu); + bench.end(n); + worker.postMessage('bye'); + }); +} + +function spinUntilIdle(w, cb) { + const t = w.performance.eventLoopUtilization(); + if (t.idle + t.active > 0) + return process.nextTick(cb); + setTimeout(() => spinUntilIdle(w, cb), 1); +} diff --git a/doc/api/perf_hooks.md b/doc/api/perf_hooks.md index 267d71344bb0b7..bb22c09fddd7de 100644 --- a/doc/api/perf_hooks.md +++ b/doc/api/perf_hooks.md @@ -72,8 +72,11 @@ added: The `eventLoopUtilization()` method returns an object that contains the cumulative duration of time the event loop has been both idle and active as a high resolution milliseconds timer. The `utilization` value is the calculated -Event Loop Utilization (ELU). If bootstrapping has not yet finished, the -properties have the value of `0`. +Event Loop Utilization (ELU). + +If bootstrapping has not yet finished on the main thread the properties have +the value of `0`. The ELU is immediately available on [Worker threads][] since +bootstrap happens within the event loop. Both `utilization1` and `utilization2` are optional parameters. @@ -766,6 +769,7 @@ require('some-module'); [Performance Timeline]: https://w3c.github.io/performance-timeline/ [User Timing]: https://www.w3.org/TR/user-timing/ [Web Performance APIs]: https://w3c.github.io/perf-timing-primer/ +[Worker threads]: worker_threads.md#worker_threads_worker_threads [`'exit'`]: process.md#process_event_exit [`child_process.spawnSync()`]: child_process.md#child_process_child_process_spawnsync_command_args_options [`process.hrtime()`]: process.md#process_process_hrtime_time diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index d361e5f5822839..4b04930db2fc6c 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -799,6 +799,65 @@ If the Worker thread is no longer running, which may occur before the [`'exit'` event][] is emitted, the returned `Promise` will be rejected immediately with an [`ERR_WORKER_NOT_RUNNING`][] error. +### `worker.performance` +<!-- YAML +added: REPLACEME +--> + +An object that can be used to query performance information from a worker +instance. Similar to [`perf_hooks.performance`][]. + +#### `performance.eventLoopUtilization([utilization1[, utilization2]])` +<!-- YAML +added: REPLACEME +--> + +* `utilization1` {Object} The result of a previous call to + `eventLoopUtilization()`. +* `utilization2` {Object} The result of a previous call to + `eventLoopUtilization()` prior to `utilization1`. +* Returns {Object} + * `idle` {number} + * `active` {number} + * `utilization` {number} + +The same call as [`perf_hooks` `eventLoopUtilization()`][], except the values +of the worker instance are returned. + +One difference is that, unlike the main thread, bootstrapping within a worker +is done within the event loop. So the event loop utilization will be +immediately available once the worker's script begins execution. + +An `idle` time that does not increase does not indicate that the worker is +stuck in bootstrap. The following examples shows how the worker's entire +lifetime will never accumulate any `idle` time, but is still be able to process +messages. + +```js +const { Worker, isMainThread, parentPort } = require('worker_threads'); + +if (isMainThread) { + const worker = new Worker(__filename); + setInterval(() => { + worker.postMessage('hi'); + console.log(worker.performance.eventLoopUtilization()); + }, 100).unref(); + return; +} + +parentPort.on('message', () => console.log('msg')).unref(); +(function r(n) { + if (--n < 0) return; + const t = Date.now(); + while (Date.now() - t < 300); + setImmediate(r, n); +})(10); +``` + +The event loop utilization of a worker is available only after the [`'online'` +event][] emitted, and if called before this, or after the [`'exit'` +event][], then all properties have the value of `0`. + ### `worker.postMessage(value[, transferList])` <!-- YAML added: v10.5.0 @@ -920,6 +979,7 @@ active handle in the event system. If the worker is already `unref()`ed calling [Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API [`'close'` event]: #worker_threads_event_close [`'exit'` event]: #worker_threads_event_exit +[`'online'` event]: #worker_threads_event_online [`ArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/ArrayBuffer [`AsyncResource`]: async_hooks.md#async_hooks_class_asyncresource [`Buffer.allocUnsafe()`]: buffer.md#buffer_static_method_buffer_allocunsafe_size @@ -940,6 +1000,8 @@ active handle in the event system. If the worker is already `unref()`ed calling [`fs.close()`]: fs.md#fs_fs_close_fd_callback [`fs.open()`]: fs.md#fs_fs_open_path_flags_mode_callback [`markAsUntransferable()`]: #worker_threads_worker_markasuntransferable_object +[`perf_hooks.performance`]: #perf_hooks.md#perf_hooks_perf_hooks_performance +[`perf_hooks` `eventLoopUtilization()`]: perf_hooks.md#perf_hooks_performance_eventlooputilization_utilization1_utilization2 [`port.on('message')`]: #worker_threads_event_message [`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage [`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist diff --git a/lib/internal/worker.js b/lib/internal/worker.js index e97cd7efea22df..b3c310e93e8ff1 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -20,6 +20,7 @@ const { const EventEmitter = require('events'); const assert = require('internal/assert'); const path = require('path'); +const { timeOrigin } = internalBinding('performance'); const errorCodes = require('internal/errors').codes; const { @@ -70,6 +71,8 @@ const kOnMessage = Symbol('kOnMessage'); const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); const kOnErrorMessage = Symbol('kOnErrorMessage'); const kParentSideStdio = Symbol('kParentSideStdio'); +const kLoopStartTime = Symbol('kLoopStartTime'); +const kIsOnline = Symbol('kIsOnline'); const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { @@ -223,6 +226,12 @@ class Worker extends EventEmitter { null, hasStdin: !!options.stdin }, transferList); + // Use this to cache the Worker's loopStart value once available. + this[kLoopStartTime] = -1; + this[kIsOnline] = false; + this.performance = { + eventLoopUtilization: eventLoopUtilization.bind(this), + }; // Actually start the new thread now that everything is in place. this[kHandle].startThread(); } @@ -254,6 +263,7 @@ class Worker extends EventEmitter { [kOnMessage](message) { switch (message.type) { case messageTypes.UP_AND_RUNNING: + this[kIsOnline] = true; return this.emit('online'); case messageTypes.COULD_NOT_SERIALIZE_ERROR: return this[kOnCouldNotSerializeErr](); @@ -415,6 +425,52 @@ function makeResourceLimits(float64arr) { }; } +function eventLoopUtilization(util1, util2) { + // TODO(trevnorris): Works to solve the thread-safe read/write issue of + // loopTime, but has the drawback that it can't be set until the event loop + // has had a chance to turn. So it will be impossible to read the ELU of + // a worker thread immediately after it's been created. + if (!this[kIsOnline] || !this[kHandle]) { + return { idle: 0, active: 0, utilization: 0 }; + } + + // Cache loopStart, since it's only written to once. + if (this[kLoopStartTime] === -1) { + this[kLoopStartTime] = this[kHandle].loopStartTime(); + if (this[kLoopStartTime] === -1) + return { idle: 0, active: 0, utilization: 0 }; + } + + if (util2) { + const idle = util1.idle - util2.idle; + const active = util1.active - util2.active; + return { idle, active, utilization: active / (idle + active) }; + } + + const idle = this[kHandle].loopIdleTime(); + + // Using performance.now() here is fine since it's always the time from + // the beginning of the process, and is why it needs to be offset by the + // loopStart time (which is also calculated from the beginning of the + // process). + const active = now() - this[kLoopStartTime] - idle; + + if (!util1) { + return { idle, active, utilization: active / (idle + active) }; + } + + const idle_delta = idle - util1.idle; + const active_delta = active - util1.active; + const utilization = active_delta / (idle_delta + active_delta); + return { idle: idle_delta, active: active_delta, utilization }; +} + +// Duplicate code from performance.now() so don't need to require perf_hooks. +function now() { + const hr = process.hrtime(); + return (hr[0] * 1000 + hr[1] / 1e6) - timeOrigin; +} + module.exports = { ownsProcessState, isMainThread, diff --git a/src/node_worker.cc b/src/node_worker.cc index e28aab7fb24c7f..7369e13768e2d9 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -746,6 +746,39 @@ void Worker::TakeHeapSnapshot(const FunctionCallbackInfo<Value>& args) { args.GetReturnValue().Set(scheduled ? taker->object() : Local<Object>()); } +void Worker::LoopIdleTime(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + + Mutex::ScopedLock lock(w->mutex_); + // Using w->is_stopped() here leads to a deadlock, and checking is_stopped() + // before locking the mutex is a race condition. So manually do the same + // check. + if (w->stopped_ || w->env_ == nullptr) + return args.GetReturnValue().Set(-1); + + uint64_t idle_time = uv_metrics_idle_time(w->env_->event_loop()); + args.GetReturnValue().Set(1.0 * idle_time / 1e6); +} + +void Worker::LoopStartTime(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + + Mutex::ScopedLock lock(w->mutex_); + // Using w->is_stopped() here leads to a deadlock, and checking is_stopped() + // before locking the mutex is a race condition. So manually do the same + // check. + if (w->stopped_ || w->env_ == nullptr) + return args.GetReturnValue().Set(-1); + + double loop_start_time = w->env_->performance_state()->milestones[ + node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START]; + CHECK_GE(loop_start_time, 0); + args.GetReturnValue().Set( + (loop_start_time - node::performance::timeOrigin) / 1e6); +} + namespace { // Return the MessagePort that is global for this Environment and communicates @@ -779,6 +812,8 @@ void InitWorker(Local<Object> target, env->SetProtoMethod(w, "unref", Worker::Unref); env->SetProtoMethod(w, "getResourceLimits", Worker::GetResourceLimits); env->SetProtoMethod(w, "takeHeapSnapshot", Worker::TakeHeapSnapshot); + env->SetProtoMethod(w, "loopIdleTime", Worker::LoopIdleTime); + env->SetProtoMethod(w, "loopStartTime", Worker::LoopStartTime); Local<String> workerString = FIXED_ONE_BYTE_STRING(env->isolate(), "Worker"); @@ -845,6 +880,8 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::Unref); registry->Register(Worker::GetResourceLimits); registry->Register(Worker::TakeHeapSnapshot); + registry->Register(Worker::LoopIdleTime); + registry->Register(Worker::LoopStartTime); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index 50611fb3f2ea10..2c65f0e1a83bbe 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -66,6 +66,8 @@ class Worker : public AsyncWrap { const v8::FunctionCallbackInfo<v8::Value>& args); v8::Local<v8::Float64Array> GetResourceLimits(v8::Isolate* isolate) const; static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args); + static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args); + static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args); private: void CreateEnvMessagePort(Environment* env); diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 403d8e0993cf74..2a811d65cc2633 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -108,6 +108,7 @@ const expectedModules = new Set([ if (!common.isMainThread) { [ 'Internal Binding messaging', + 'Internal Binding performance', 'Internal Binding symbols', 'Internal Binding worker', 'NativeModule internal/streams/duplex', diff --git a/test/parallel/test-performance-eventlooputil.js b/test/parallel/test-performance-eventlooputil.js index 1a5d86db750a94..5d039209dadde7 100644 --- a/test/parallel/test-performance-eventlooputil.js +++ b/test/parallel/test-performance-eventlooputil.js @@ -1,8 +1,9 @@ 'use strict'; -require('../common'); +const { mustCall } = require('../common'); -const TIMEOUT = 50; +const TIMEOUT = 10; +const SPIN_DUR = 50; const assert = require('assert'); const { performance } = require('perf_hooks'); @@ -21,13 +22,15 @@ if (nodeTiming.loopStart === -1) { { idle: 0, active: 0, utilization: 0 }); } -// Place in setTimeout() to make sure there is some idle time, but not going to -// assert this since it could make the test flaky. -setTimeout(() => { +setTimeout(mustCall(function r() { const t = Date.now(); const elu1 = eventLoopUtilization(); - while (Date.now() - t < 50) { } + // Force idle time to accumulate before allowing test to continue. + if (elu1.idle <= 0) + return setTimeout(mustCall(r), 5); + + while (Date.now() - t < SPIN_DUR) { } const elu2 = eventLoopUtilization(); const elu3 = eventLoopUtilization(elu1); @@ -38,12 +41,13 @@ setTimeout(() => { assert.strictEqual(elu3.utilization, 1); assert.strictEqual(elu4.utilization, 1); assert.strictEqual(elu2.active - elu1.active, elu4.active); - assert.ok(elu2.active > elu3.active); - assert.ok(elu2.active > elu4.active); - assert.ok(elu3.active > elu4.active); + assert.ok(elu3.active > SPIN_DUR - 10, `${elu3.active} <= ${SPIN_DUR - 10}`); + assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`); + assert.ok(elu2.active > elu3.active, `${elu2.active} <= ${elu3.active}`); + assert.ok(elu2.active > elu4.active, `${elu2.active} <= ${elu4.active}`); - setTimeout(runIdleTimeTest, TIMEOUT); -}, 5); + setTimeout(mustCall(runIdleTimeTest), TIMEOUT); +}), 5); function runIdleTimeTest() { const idleTime = nodeTiming.idleTime; @@ -55,7 +59,7 @@ function runIdleTimeTest() { assert.strictEqual(elu1.idle, idleTime); assert.strictEqual(elu1.utilization, elu1.active / sum); - setTimeout(runCalcTest, TIMEOUT, elu1); + setTimeout(mustCall(runCalcTest), TIMEOUT, elu1); } function runCalcTest(elu1) { @@ -65,18 +69,20 @@ function runCalcTest(elu1) { const active_delta = elu2.active - elu1.active; const idle_delta = elu2.idle - elu1.idle; - assert.ok(elu2.idle >= 0); - assert.ok(elu2.active >= 0); - assert.ok(elu3.idle >= 0); - assert.ok(elu3.active >= 0); - assert.ok(elu2.idle + elu2.active > elu1.idle + elu2.active); - assert.ok(elu2.idle + elu2.active >= now - nodeTiming.loopStart); + assert.ok(elu2.idle >= 0, `${elu2.idle} < 0`); + assert.ok(elu2.active >= 0, `${elu2.active} < 0`); + assert.ok(elu3.idle >= 0, `${elu3.idle} < 0`); + assert.ok(elu3.active >= 0, `${elu3.active} < 0`); + assert.ok(elu2.idle + elu2.active > elu1.idle + elu1.active, + `${elu2.idle + elu2.active} <= ${elu1.idle + elu1.active}`); + assert.ok(elu2.idle + elu2.active >= now - nodeTiming.loopStart, + `${elu2.idle + elu2.active} < ${now - nodeTiming.loopStart}`); assert.strictEqual(elu3.active, elu2.active - elu1.active); assert.strictEqual(elu3.idle, elu2.idle - elu1.idle); assert.strictEqual(elu3.utilization, active_delta / (idle_delta + active_delta)); - setImmediate(runWorkerTest); + setImmediate(mustCall(runWorkerTest)); } function runWorkerTest() { @@ -90,10 +96,11 @@ function runWorkerTest() { const elu1 = eventLoopUtilization(); const worker = new Worker(__filename, { argv: [ 'iamalive' ] }); - worker.on('message', (msg) => { + worker.on('message', mustCall((msg) => { const elu2 = eventLoopUtilization(elu1); const data = JSON.parse(msg); - assert.ok(elu2.active + elu2.idle > data.active + data.idle); - }); + assert.ok(elu2.active + elu2.idle > data.active + data.idle, + `${elu2.active + elu2.idle} <= ${data.active + data.idle}`); + })); } diff --git a/test/parallel/test-worker-eventlooputil.js b/test/parallel/test-worker-eventlooputil.js new file mode 100644 index 00000000000000..6759bd362cfd05 --- /dev/null +++ b/test/parallel/test-worker-eventlooputil.js @@ -0,0 +1,116 @@ +'use strict'; + +const { mustCall, mustCallAtLeast } = require('../common'); + +const assert = require('assert'); +const { + Worker, + MessageChannel, + MessagePort, + parentPort, +} = require('worker_threads'); +const { eventLoopUtilization, now } = require('perf_hooks').performance; + +// Use argv to detect whether we're running as a Worker called by this test vs. +// this test also being called as a Worker. +if (process.argv[2] === 'iamalive') { + const iaElu = idleActive(eventLoopUtilization()); + // Checks that the worker bootstrap is running after the event loop started. + assert.ok(iaElu > 0, `${iaElu} <= 0`); + parentPort.once('message', mustCall((msg) => { + assert.ok(msg.metricsCh instanceof MessagePort); + msg.metricsCh.on('message', mustCallAtLeast(workerOnMetricsMsg, 1)); + })); + return; +} + +function workerOnMetricsMsg(msg) { + if (msg.cmd === 'close') { + return this.close(); + } + + if (msg.cmd === 'elu') { + return this.postMessage(eventLoopUtilization()); + } + + if (msg.cmd === 'spin') { + const t = now(); + while (now() - t < msg.dur); + return this.postMessage(eventLoopUtilization()); + } +} + +let worker; +let metricsCh; +let mainElu; +let workerELU; + +(function r() { + // Force some idle time to accumulate before proceeding with test. + if (eventLoopUtilization().idle <= 0) + return setTimeout(mustCall(r), 5); + + worker = new Worker(__filename, { argv: [ 'iamalive' ] }); + metricsCh = new MessageChannel(); + worker.postMessage({ metricsCh: metricsCh.port1 }, [ metricsCh.port1 ]); + + workerELU = worker.performance.eventLoopUtilization; + mainElu = eventLoopUtilization(); + metricsCh.port2.once('message', mustCall(checkWorkerIdle)); + metricsCh.port2.postMessage({ cmd: 'elu' }); + // Make sure it's still safe to call eventLoopUtilization() after the worker + // hass been closed. + worker.on('exit', mustCall(() => { + assert.deepStrictEqual(worker.performance.eventLoopUtilization(), + { idle: 0, active: 0, utilization: 0 }); + })); +})(); + + +function checkWorkerIdle(wElu) { + const tmpMainElu = eventLoopUtilization(mainElu); + const perfWorkerElu = workerELU(); + const eluDiff = eventLoopUtilization(perfWorkerElu, mainElu); + + assert.strictEqual(idleActive(eluDiff), + (perfWorkerElu.active - mainElu.active) + + (perfWorkerElu.idle - mainElu.idle)); + assert.ok(idleActive(wElu) > 0, `${idleActive(wElu)} <= 0`); + assert.ok(idleActive(workerELU(wElu)) > 0, + `${idleActive(workerELU(wElu))} <= 0`); + assert.ok(idleActive(perfWorkerElu) > idleActive(wElu), + `${idleActive(perfWorkerElu)} <= ${idleActive(wElu)}`); + assert.ok(idleActive(tmpMainElu) > idleActive(perfWorkerElu), + `${idleActive(tmpMainElu)} <= ${idleActive(perfWorkerElu)}`); + + wElu = workerELU(); + setTimeout(mustCall(() => { + wElu = workerELU(wElu); + // Some clocks fire early. Removing a few milliseconds to cover that. + assert.ok(idleActive(wElu) >= 45, `${idleActive(wElu)} < 45`); + // Cutting the idle time in half since it's possible that the call took a + // lot of resources to process? + assert.ok(wElu.idle >= 25, `${wElu.idle} < 25`); + + checkWorkerActive(); + }), 50); +} + +function checkWorkerActive() { + const w = workerELU(); + + metricsCh.port2.postMessage({ cmd: 'spin', dur: 50 }); + metricsCh.port2.once('message', (wElu) => { + const w2 = workerELU(w); + + assert.ok(w2.active >= 50, `${w2.active} < 50`); + assert.ok(idleActive(wElu) > idleActive(w2), + `${idleActive(wElu)} <= ${idleActive(w2)}`); + + metricsCh.port2.postMessage({ cmd: 'close' }); + }); +} + +function idleActive(elu) { + return elu.idle + elu.active; +}