Skip to content

Commit 6e37904

Browse files
committed
feat(swingset): Add Node.js Worker (thread) -based VatManager
This adds a per-vat option to run the vat code in a separate thread, sharing the process with the main (kernel) thread, sending VatDelivery and VatSyscall objects over the postMessage channel. This isn't particularly useful by itself, but it establishes the protocol for running vats in a separate *process*, possibly written in a different language or using a different JS engine (like XS, in #1299). This 'nodeWorker' managertype has several limitations. The shallow ones are: * vatPowers is missing transformTildot, which shouldn't be hard to add * vatPowers.testLog is missing, only used for unit tests so we can probably live without it * vatPowers is missing makeGetMeter/transformMetering (and will probably never get them, since they're only used for within-vat metering and we're trying to get rid of that) * metering is not implemented at all * delivery transcripts (and replay) are not yet implemented Metering shouldn't be too hard to add, although we'll probably make it an option, to avoid paying the instrumented-globals penalty when we aren't using it. We also need to add proper control over vat termination (via meter exhaustion or manually). The deeper limitation is that nodeWorkers cannot block to wait for a syscall (like `callNow`), so they cannot invoke devices. refs #1127 closes #1384
1 parent fb52977 commit 6e37904

File tree

9 files changed

+419
-2
lines changed

9 files changed

+419
-2
lines changed

packages/SwingSet/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
},
2424
"devDependencies": {
2525
"@agoric/install-metering-and-ses": "^0.1.1",
26-
"@agoric/install-ses": "^0.2.0",
2726
"esm": "^3.2.5",
2827
"tap": "^14.10.5",
2928
"tape": "^4.13.2",
@@ -35,6 +34,7 @@
3534
"@agoric/bundle-source": "^1.1.6",
3635
"@agoric/eventual-send": "^0.9.3",
3736
"@agoric/import-bundle": "^0.0.8",
37+
"@agoric/install-ses": "^0.2.0",
3838
"@agoric/marshal": "^0.2.3",
3939
"@agoric/nat": "^2.0.1",
4040
"@agoric/promise-kit": "^0.1.3",

packages/SwingSet/src/controller.js

+9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import fs from 'fs';
44
import path from 'path';
55
import re2 from 're2';
6+
import { Worker } from 'worker_threads';
67
import * as babelCore from '@babel/core';
78
import * as babelParser from '@agoric/babel-parser';
89
import babelGenerate from '@babel/generator';
@@ -226,13 +227,21 @@ export async function buildVatController(
226227
}`,
227228
);
228229

230+
function makeNodeWorker() {
231+
const supercode = require.resolve(
232+
'./kernel/vatManager/nodeWorkerSupervisor.js',
233+
);
234+
return new Worker(supercode);
235+
}
236+
229237
const kernelEndowments = {
230238
waitUntilQuiescent,
231239
hostStorage,
232240
makeVatEndowments,
233241
replaceGlobalMeter,
234242
transformMetering,
235243
transformTildot,
244+
makeNodeWorker,
236245
};
237246

238247
const kernel = buildKernel(kernelEndowments);

packages/SwingSet/src/kernel/kernel.js

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export default function buildKernel(kernelEndowments) {
3939
replaceGlobalMeter,
4040
transformMetering,
4141
transformTildot,
42+
makeNodeWorker,
4243
} = kernelEndowments;
4344
insistStorageAPI(hostStorage);
4445
const { enhancedCrankBuffer, commitCrank } = wrapStorage(hostStorage);
@@ -565,6 +566,7 @@ export default function buildKernel(kernelEndowments) {
565566
testLog,
566567
transformMetering,
567568
waitUntilQuiescent,
569+
makeNodeWorker,
568570
});
569571

570572
/*

packages/SwingSet/src/kernel/vatManager/factory.js

+17-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import { assert } from '@agoric/assert';
33
import { assertKnownOptions } from '../../assertOptions';
44
import { makeLocalVatManagerFactory } from './localVatManager';
5+
import { makeNodeWorkerVatManagerFactory } from './nodeWorker';
56

67
export function makeVatManagerFactory({
78
allVatPowers,
@@ -10,6 +11,7 @@ export function makeVatManagerFactory({
1011
meterManager,
1112
transformMetering,
1213
waitUntilQuiescent,
14+
makeNodeWorker,
1315
}) {
1416
const localFactory = makeLocalVatManagerFactory({
1517
allVatPowers,
@@ -20,6 +22,11 @@ export function makeVatManagerFactory({
2022
waitUntilQuiescent,
2123
});
2224

25+
const nodeWorkerFactory = makeNodeWorkerVatManagerFactory({
26+
makeNodeWorker,
27+
kernelKeeper,
28+
});
29+
2330
function validateManagerOptions(managerOptions) {
2431
assertKnownOptions(managerOptions, [
2532
'enablePipelining',
@@ -63,7 +70,16 @@ export function makeVatManagerFactory({
6370
return localFactory.createFromBundle(vatID, bundle, managerOptions);
6471
}
6572

66-
throw Error(`unknown manager type ${managerType}, not 'local'`);
73+
if (managerType === 'nodeWorker') {
74+
// 'setup' based vats must be local. TODO: stop using 'setup' in vats,
75+
// but tests and comms-vat still need it
76+
assert(!setup, `setup()-based vats must use a local Manager`);
77+
return nodeWorkerFactory.createFromBundle(vatID, bundle, managerOptions);
78+
}
79+
80+
throw Error(
81+
`unknown manager type ${managerType}, not 'local' or 'nodeWorker'`,
82+
);
6783
}
6884

6985
return harden(vatManagerFactory);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/* global harden */
2+
3+
// import { Worker } from 'worker_threads'; // not from a Compartment
4+
import { assert } from '@agoric/assert';
5+
import { makePromiseKit } from '@agoric/promise-kit';
6+
import { makeTranscriptManager } from './transcript';
7+
8+
import { createSyscall } from './syscall';
9+
10+
// start a "Worker" (Node's tool for starting new threads) and load a bundle
11+
// into it
12+
13+
/*
14+
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
15+
function wait10ms() {
16+
const { promise: queueEmptyP, resolve } = makePromiseKit();
17+
setTimeout(() => resolve(), 10);
18+
return queueEmptyP;
19+
}
20+
*/
21+
22+
// eslint-disable-next-line no-unused-vars
23+
function parentLog(first, ...args) {
24+
// console.error(`--parent: ${first}`, ...args);
25+
}
26+
27+
export function makeNodeWorkerVatManagerFactory(tools) {
28+
const { makeNodeWorker, kernelKeeper } = tools;
29+
30+
function createFromBundle(vatID, bundle, managerOptions) {
31+
const { vatParameters } = managerOptions;
32+
assert(!managerOptions.metered, 'not supported yet');
33+
assert(!managerOptions.notifyTermination, 'not supported yet');
34+
assert(!managerOptions.enableSetup, 'not supported at all');
35+
if (managerOptions.enableInternalMetering) {
36+
// TODO: warn+ignore, rather than throw, because the kernel enables it
37+
// for all vats, because the Spawner still needs it. When the kernel
38+
// stops doing that, turn this into a regular assert
39+
console.log(`node-worker does not support enableInternalMetering`);
40+
}
41+
const vatKeeper = kernelKeeper.allocateVatKeeperIfNeeded(vatID);
42+
const transcriptManager = makeTranscriptManager(
43+
kernelKeeper,
44+
vatKeeper,
45+
vatID,
46+
);
47+
48+
// prepare to accept syscalls from the worker
49+
50+
// TODO: make the worker responsible for checking themselves: we send
51+
// both the delivery and the expected syscalls, and the supervisor
52+
// compares what the bundle does with what it was told to expect.
53+
// Modulo flow control, we just stream transcript entries at the
54+
// worker and eventually get back an "ok" or an error. When we do
55+
// that, doSyscall won't even see replayed syscalls from the worker.
56+
57+
const { doSyscall, setVatSyscallHandler } = createSyscall(
58+
transcriptManager,
59+
);
60+
function handleSyscall(vatSyscallObject) {
61+
const type = vatSyscallObject[0];
62+
if (type === 'callNow') {
63+
throw Error(`nodeWorker cannot block, cannot use syscall.callNow`);
64+
}
65+
doSyscall(vatSyscallObject);
66+
}
67+
68+
// start the worker and establish a connection
69+
70+
const { promise: workerP, resolve: gotWorker } = makePromiseKit();
71+
72+
function sendToWorker(msg) {
73+
assert(msg instanceof Array);
74+
workerP.then(worker => worker.postMessage(msg));
75+
}
76+
77+
const {
78+
promise: dispatchReadyP,
79+
resolve: dispatchIsReady,
80+
} = makePromiseKit();
81+
let waiting;
82+
83+
function handleUpstream([type, ...args]) {
84+
parentLog(`received`, type);
85+
if (type === 'setUplinkAck') {
86+
parentLog(`upload ready`);
87+
} else if (type === 'gotBundle') {
88+
parentLog(`bundle loaded`);
89+
} else if (type === 'dispatchReady') {
90+
parentLog(`dispatch() ready`);
91+
// wait10ms().then(dispatchIsReady); // stall to let logs get printed
92+
dispatchIsReady();
93+
} else if (type === 'syscall') {
94+
parentLog(`syscall`, args);
95+
const vatSyscallObject = args;
96+
handleSyscall(vatSyscallObject);
97+
} else if (type === 'deliverDone') {
98+
parentLog(`deliverDone`);
99+
if (waiting) {
100+
const resolve = waiting;
101+
waiting = null;
102+
resolve();
103+
}
104+
} else {
105+
parentLog(`unrecognized uplink message ${type}`);
106+
}
107+
}
108+
109+
const worker = makeNodeWorker();
110+
worker.on('message', handleUpstream);
111+
gotWorker(worker);
112+
113+
parentLog(`instructing worker to load bundle..`);
114+
sendToWorker(['setBundle', bundle, vatParameters]);
115+
116+
function deliver(delivery) {
117+
parentLog(`sending delivery`, delivery);
118+
assert(!waiting, `already waiting for delivery`);
119+
const pr = makePromiseKit();
120+
waiting = pr.resolve;
121+
sendToWorker(['deliver', ...delivery]);
122+
return pr.promise;
123+
}
124+
125+
function replayTranscript() {
126+
throw Error(`replayTranscript not yet implemented`);
127+
}
128+
129+
function shutdown() {
130+
// this returns a Promise that fulfills with 1 if we used
131+
// worker.terminate(), otherwise with the `exitCode` passed to
132+
// `process.exit(exitCode)` within the worker.
133+
return worker.terminate();
134+
}
135+
136+
const manager = harden({
137+
replayTranscript,
138+
setVatSyscallHandler,
139+
deliver,
140+
shutdown,
141+
});
142+
143+
return dispatchReadyP.then(() => manager);
144+
}
145+
146+
return harden({ createFromBundle });
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/* global harden */
2+
// this file is loaded at the start of a new Worker, which makes it a new JS
3+
// environment (with it's own Realm), so we must install-ses too.
4+
import '@agoric/install-ses';
5+
import { parentPort } from 'worker_threads';
6+
import anylogger from 'anylogger';
7+
8+
import { assert } from '@agoric/assert';
9+
import { importBundle } from '@agoric/import-bundle';
10+
import { Remotable, getInterfaceOf } from '@agoric/marshal';
11+
import { HandledPromise } from '@agoric/eventual-send';
12+
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
13+
import { makeLiveSlots } from '../liveSlots';
14+
15+
// eslint-disable-next-line no-unused-vars
16+
function workerLog(first, ...args) {
17+
// console.error(`---worker: ${first}`, ...args);
18+
}
19+
20+
workerLog(`supervisor started`);
21+
22+
function makeConsole(tag) {
23+
const log = anylogger(tag);
24+
const cons = {};
25+
for (const level of ['debug', 'log', 'info', 'warn', 'error']) {
26+
cons[level] = log[level];
27+
}
28+
return harden(cons);
29+
}
30+
31+
function runAndWait(f, errmsg) {
32+
Promise.resolve()
33+
.then(f)
34+
.then(undefined, err => workerLog(`doProcess: ${errmsg}:`, err));
35+
return waitUntilQuiescent();
36+
}
37+
38+
function sendUplink(msg) {
39+
assert(msg instanceof Array, `msg must be an Array`);
40+
parentPort.postMessage(msg);
41+
}
42+
43+
let dispatch;
44+
45+
async function doProcess(dispatchRecord, errmsg) {
46+
const dispatchOp = dispatchRecord[0];
47+
const dispatchArgs = dispatchRecord.slice(1);
48+
workerLog(`runAndWait`);
49+
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
50+
workerLog(`doProcess done`);
51+
}
52+
53+
function doNotify(vpid, vp) {
54+
const errmsg = `vat.promise[${vpid}] ${vp.state} failed`;
55+
switch (vp.state) {
56+
case 'fulfilledToPresence':
57+
return doProcess(['notifyFulfillToPresence', vpid, vp.slot], errmsg);
58+
case 'redirected':
59+
throw new Error('not implemented yet');
60+
case 'fulfilledToData':
61+
return doProcess(['notifyFulfillToData', vpid, vp.data], errmsg);
62+
case 'rejected':
63+
return doProcess(['notifyReject', vpid, vp.data], errmsg);
64+
default:
65+
throw Error(`unknown promise state '${vp.state}'`);
66+
}
67+
}
68+
69+
let syscallLog;
70+
parentPort.on('message', ([type, ...margs]) => {
71+
workerLog(`received`, type);
72+
if (type === 'start') {
73+
// TODO: parent should send ['start', vatID]
74+
workerLog(`got start`);
75+
sendUplink(['gotStart']);
76+
} else if (type === 'setBundle') {
77+
const [bundle, vatParameters] = margs;
78+
const endowments = {
79+
console: makeConsole(`SwingSet:vatWorker`),
80+
HandledPromise,
81+
};
82+
importBundle(bundle, { endowments }).then(vatNS => {
83+
workerLog(`got vatNS:`, Object.keys(vatNS).join(','));
84+
sendUplink(['gotBundle']);
85+
86+
function doSyscall(vatSyscallObject) {
87+
sendUplink(['syscall', ...vatSyscallObject]);
88+
}
89+
const syscall = harden({
90+
send: (...args) => doSyscall(['send', ...args]),
91+
callNow: (..._args) => {
92+
throw Error(`nodeWorker cannot syscall.callNow`);
93+
},
94+
subscribe: (...args) => doSyscall(['subscribe', ...args]),
95+
fulfillToData: (...args) => doSyscall(['fulfillToData', ...args]),
96+
fulfillToPresence: (...args) =>
97+
doSyscall(['fulfillToPresence', ...args]),
98+
reject: (...args) => doSyscall(['reject', ...args]),
99+
});
100+
101+
const state = null;
102+
const vatID = 'demo-vatID';
103+
// todo: maybe add transformTildot, makeGetMeter/transformMetering to
104+
// vatPowers, but only if options tell us they're wanted. Maybe
105+
// transformTildot should be async and outsourced to the kernel
106+
// process/thread.
107+
const vatPowers = { Remotable, getInterfaceOf };
108+
dispatch = makeLiveSlots(
109+
syscall,
110+
state,
111+
vatNS.buildRootObject,
112+
vatID,
113+
vatPowers,
114+
vatParameters,
115+
);
116+
workerLog(`got dispatch:`, Object.keys(dispatch).join(','));
117+
sendUplink(['dispatchReady']);
118+
});
119+
} else if (type === 'deliver') {
120+
if (!dispatch) {
121+
workerLog(`error: deliver before dispatchReady`);
122+
return;
123+
}
124+
const [dtype, ...dargs] = margs;
125+
if (dtype === 'message') {
126+
const [targetSlot, msg] = dargs;
127+
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
128+
doProcess(
129+
['deliver', targetSlot, msg.method, msg.args, msg.result],
130+
errmsg,
131+
).then(() => {
132+
sendUplink(['deliverDone']);
133+
});
134+
} else if (dtype === 'notify') {
135+
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
136+
} else {
137+
throw Error(`bad delivery type ${dtype}`);
138+
}
139+
} else {
140+
workerLog(`unrecognized downlink message ${type}`);
141+
}
142+
});

0 commit comments

Comments
 (0)