Skip to content

Commit d58cfa4

Browse files
committed
feat: add message sequence number to comms protocol
1 parent 8b931cf commit d58cfa4

File tree

4 files changed

+50
-15
lines changed

4 files changed

+50
-15
lines changed

packages/SwingSet/src/vats/comms/delivery.js

+18-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { parseLocalSlot, insistLocalType } from './parseLocalSlots';
55
import { makeUndeliverableError } from '../../makeUndeliverableError';
66
import { insistCapData } from '../../capdata';
77
import { insistRemoteType } from './parseRemoteSlot';
8-
import { insistRemoteID } from './remote';
8+
import { insistRemoteID, getRemote } from './remote';
99

1010
const UNDEFINED = harden({
1111
body: JSON.stringify({ '@qclass': 'undefined' }),
@@ -132,14 +132,27 @@ export function makeDeliveryKit(state, syscall, transmit, clistKit, stateKit) {
132132
// changed to assert here that the result parameter is null or undefined.
133133
syscall.resolve([[result, false, UNDEFINED]]);
134134
}
135-
const command = message.split(':', 1)[0];
135+
// The message is preceded by an optional sequence number:
136+
// `$seqnum:$actualMessage` or `:actualMessage`
137+
const colon = message.indexOf(':');
138+
assert(colon >= 0, X`received message ${message} lacks seqNum delimiter`);
139+
const seqNum = message.substring(0, colon);
140+
const remote = getRemote(state, remoteID);
141+
assert(
142+
seqNum === '' || seqNum === `${remote.nextExpectedRecvSeqNum}`,
143+
X`unexpected recv seqNum ${seqNum}`,
144+
);
145+
remote.nextExpectedRecvSeqNum += 1;
146+
147+
const msgBody = message.substring(colon + 1);
148+
const command = msgBody.split(':', 1)[0];
136149
if (command === 'deliver') {
137-
return sendFromRemote(remoteID, message);
150+
return sendFromRemote(remoteID, msgBody);
138151
}
139152
if (command === 'resolve') {
140-
return resolveFromRemote(remoteID, message);
153+
return resolveFromRemote(remoteID, msgBody);
141154
}
142-
assert.fail(X`unrecognized '${command}' in received message ${message}`);
155+
assert.fail(X`unrecognized '${command}' in received message ${msgBody}`);
143156
}
144157

145158
function sendFromRemote(remoteID, message) {

packages/SwingSet/src/vats/comms/dispatch.js

+7-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export function buildCommsDispatch(
1818
_vatPowers,
1919
vatParameters = {},
2020
) {
21-
const { identifierBase = 0 } = vatParameters;
21+
const { identifierBase = 0, sendExplicitSeqNums = true } = vatParameters;
2222
const state = makeState(identifierBase);
2323
const stateKit = makeStateKit(state);
2424
const clistKit = makeCListKit(state, syscall, stateKit);
@@ -27,7 +27,12 @@ export function buildCommsDispatch(
2727
const remote = getRemote(state, remoteID);
2828
// the vat-tp "integrity layer" is a regular vat, so it expects an argument
2929
// encoded as JSON
30-
const args = harden({ body: JSON.stringify([msg]), slots: [] });
30+
const seqNum = sendExplicitSeqNums ? remote.nextSendSeqNum : '';
31+
const args = harden({
32+
body: JSON.stringify([`${seqNum}:${msg}`]),
33+
slots: [],
34+
});
35+
remote.nextSendSeqNum += 1;
3136
syscall.send(remote.transmitterID, 'transmit', args); // sendOnly
3237
}
3338

packages/SwingSet/src/vats/comms/remote.js

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ export function addRemote(state, name, transmitterID) {
5050
nextResolverIndex: state.identifierBase + 30,
5151
nextPromiseIndex: state.identifierBase + 40,
5252
transmitterID,
53+
nextSendSeqNum: 1,
54+
nextExpectedRecvSeqNum: 1,
5355
});
5456
state.identifierBase += 1000;
5557
state.names.set(name, remoteID);

packages/SwingSet/test/test-comms.js

+23-8
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ test('transmit', t => {
7171
t.deepEqual(sends.shift(), [
7272
transmitterID,
7373
'transmit',
74-
encodeArgs('deliver:ro+23:foo:;argsbytes'),
74+
encodeArgs('1:deliver:ro+23:foo:;argsbytes'),
7575
]);
7676

7777
// bob!bar(alice, bob)
@@ -84,7 +84,7 @@ test('transmit', t => {
8484
t.deepEqual(sends.shift(), [
8585
transmitterID,
8686
'transmit',
87-
encodeArgs('deliver:ro+23:bar::ro-20:ro+23;argsbytes'),
87+
encodeArgs('2:deliver:ro+23:bar::ro-20:ro+23;argsbytes'),
8888
]);
8989
// the outbound ro-20 should match an inbound ro+20, both represent 'alice'
9090
t.is(getLocalForRemote(remoteID, 'ro+20'), aliceLocal);
@@ -98,7 +98,7 @@ test('transmit', t => {
9898
t.deepEqual(sends.shift(), [
9999
transmitterID,
100100
'transmit',
101-
encodeArgs('deliver:ro+23:bar::ro-20:ro+23;argsbytes'),
101+
encodeArgs('3:deliver:ro+23:bar::ro-20:ro+23;argsbytes'),
102102
]);
103103

104104
// bob!cat(alice, bob, ayana)
@@ -112,7 +112,7 @@ test('transmit', t => {
112112
t.deepEqual(sends.shift(), [
113113
transmitterID,
114114
'transmit',
115-
encodeArgs('deliver:ro+23:cat::ro-20:ro+23:ro-21;argsbytes'),
115+
encodeArgs('4:deliver:ro+23:cat::ro-20:ro+23:ro-21;argsbytes'),
116116
]);
117117
});
118118

@@ -143,7 +143,7 @@ test('receive', t => {
143143
d.deliver(
144144
receiverID,
145145
'receive',
146-
encodeArgs(`deliver:${bobRemote}:foo:;argsbytes`),
146+
encodeArgs(`1:deliver:${bobRemote}:foo:;argsbytes`),
147147
null,
148148
);
149149
t.deepEqual(sends.shift(), [bobKernel, 'foo', capdata('argsbytes')]);
@@ -152,7 +152,7 @@ test('receive', t => {
152152
d.deliver(
153153
receiverID,
154154
'receive',
155-
encodeArgs(`deliver:${bobRemote}:bar::ro-20:${bobRemote};argsbytes`),
155+
encodeArgs(`2:deliver:${bobRemote}:bar::ro-20:${bobRemote};argsbytes`),
156156
null,
157157
);
158158
const expectedAliceKernel = 'o+31';
@@ -168,10 +168,11 @@ test('receive', t => {
168168
t.is(getLocalForKernel(expectedAliceKernel), 'lo11');
169169

170170
// bob!bar(alice, bob), again, to test stability
171+
// also test absent sequence number
171172
d.deliver(
172173
receiverID,
173174
'receive',
174-
encodeArgs(`deliver:${bobRemote}:bar::ro-20:${bobRemote};argsbytes`),
175+
encodeArgs(`:deliver:${bobRemote}:bar::ro-20:${bobRemote};argsbytes`),
175176
null,
176177
);
177178
t.deepEqual(sends.shift(), [
@@ -185,7 +186,9 @@ test('receive', t => {
185186
d.deliver(
186187
receiverID,
187188
'receive',
188-
encodeArgs(`deliver:${bobRemote}:cat::ro-20:${bobRemote}:ro-21;argsbytes`),
189+
encodeArgs(
190+
`4:deliver:${bobRemote}:cat::ro-20:${bobRemote}:ro-21;argsbytes`,
191+
),
189192
null,
190193
);
191194
t.deepEqual(sends.shift(), [
@@ -194,6 +197,18 @@ test('receive', t => {
194197
capdata('argsbytes', [expectedAliceKernel, bobKernel, expectedAyanaKernel]),
195198
]);
196199

200+
// react to bad sequence number
201+
t.throws(
202+
() =>
203+
d.deliver(
204+
receiverID,
205+
'receive',
206+
encodeArgs(`47:deliver:${bobRemote}:bar::ro-20:${bobRemote};argsbytes`),
207+
null,
208+
),
209+
{ message: /unexpected recv seqNum \(a string\)/ },
210+
);
211+
197212
// make sure comms can tolerate dropExports, even if it's a no-op
198213
d.dropExports([expectedAliceKernel, expectedAyanaKernel]);
199214
});

0 commit comments

Comments
 (0)