Skip to content

Commit 2f3727c

Browse files
committed
fix(swingset): don't deduplicate inbound mailbox messages
The mailbox device tracking the highest inbound message number and ack for each peer, to de-duplicate repeated messages, so it could reduce the amount of kernel activity. Each call to `deliverInbound` would return a boolean to indicate whether the messages/ack were new, and thus the kernel needed to be cycled. However, the device was holding this tracking data in non-durable state, so if/when the kernel was restarted, the state would be lost. A duplicate message/ack arriving in the restarted process would trigger kernel activity that would not have run in the original process. These extra cranks caused diverge between validators when one of them was restarted, and the client sent a duplicate message (such as the pre-emptive `ack` all clients send at startup). The extra crank does not get very far, because vattp does its own deduplication, so the divergence was only visible in the slog. But when #3442 is implemented, even a single extra crank will flag the validator as out of consensus. The fix is to remove the mailbox device's dedup code, and rely upon vattp for this function. The test was also updated to match, and a new test (comparing two parallel kernels, one restarted, one not) was added. closes #3471
1 parent 937b3cf commit 2f3727c

File tree

3 files changed

+144
-159
lines changed

3 files changed

+144
-159
lines changed

packages/SwingSet/src/devices/mailbox-src.js

+10-33
Original file line numberDiff line numberDiff line change
@@ -5,50 +5,27 @@ import { assert, details as X } from '@agoric/assert';
55

66
export function buildRootDeviceNode(tools) {
77
const { SO, getDeviceState, setDeviceState, endowments } = tools;
8-
const highestInboundDelivered = harden(new Map());
9-
const highestInboundAck = harden(new Map());
108

119
let deliverInboundMessages;
1210
let deliverInboundAck;
1311

14-
function inboundCallback(hPeer, hMessages, hAck) {
15-
const peer = `${hPeer}`;
12+
function inboundCallback(peer, messages, ack) {
1613
if (!deliverInboundMessages) {
1714
throw new Error(
1815
`mailbox.inboundCallback(${peer}) called before handler was registered`,
1916
);
2017
}
21-
const ack = Nat(hAck);
22-
let didSomething = false;
23-
24-
let latestMsg = 0;
25-
if (highestInboundDelivered.has(peer)) {
26-
latestMsg = highestInboundDelivered.get(peer);
27-
}
28-
const newMessages = [];
29-
hMessages.forEach(m => {
30-
const [hNum, hMsg] = m;
31-
const num = Nat(hNum);
32-
if (num > latestMsg) {
33-
newMessages.push([num, `${hMsg}`]);
34-
latestMsg = num;
35-
highestInboundDelivered.set(peer, latestMsg);
36-
}
18+
assert.typeof(peer, 'string');
19+
messages.forEach(m => {
20+
Nat(m[0]);
21+
assert.typeof(m[1], 'string');
3722
});
38-
if (newMessages.length) {
39-
deliverInboundMessages(peer, harden(newMessages));
40-
didSomething = true;
41-
}
42-
let latestAck = 0;
43-
if (highestInboundAck.has(peer)) {
44-
latestAck = highestInboundAck.get(peer);
45-
}
46-
if (ack > latestAck) {
47-
highestInboundAck.set(peer, ack);
48-
deliverInboundAck(peer, ack);
49-
didSomething = true;
23+
Nat(ack);
24+
if (messages.length) {
25+
deliverInboundMessages(peer, harden(messages));
5026
}
51-
return didSomething;
27+
deliverInboundAck(peer, ack);
28+
return true; // always didSomething
5229
}
5330
endowments.registerInboundCallback(inboundCallback);
5431

packages/SwingSet/test/device-mailbox/test-device-mailbox.js

+126-92
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { test } from '../../tools/prepare-test-env-ava.js';
55
import path from 'path';
66
import bundleSource from '@agoric/bundle-source';
77
import { provideHostStorage } from '../../src/hostStorage.js';
8-
98
import {
109
initializeSwingset,
1110
makeSwingsetController,
@@ -15,6 +14,7 @@ import {
1514
buildMailboxStateMap,
1615
buildMailbox,
1716
} from '../../src/devices/mailbox.js';
17+
import { capargs } from '../util.js';
1818

1919
test.before(async t => {
2020
const kernelBundles = await buildKernelBundles();
@@ -91,106 +91,140 @@ test('mailbox inbound', async t => {
9191
mailbox: { ...mb.endowments },
9292
};
9393

94-
let rc;
95-
9694
const hostStorage = provideHostStorage();
9795
await initializeSwingset(config, ['mailbox2'], hostStorage, t.context.data);
9896
const c = await makeSwingsetController(hostStorage, deviceEndowments);
9997
await c.run();
100-
rc = mb.deliverInbound(
101-
'peer1',
102-
[
103-
[1, 'msg1'],
104-
[2, 'msg2'],
105-
],
106-
0,
107-
);
108-
t.truthy(rc);
98+
const m1 = [1, 'msg1'];
99+
const m2 = [2, 'msg2'];
100+
const m3 = [3, 'msg3'];
101+
t.true(mb.deliverInbound('peer1', [m1, m2], 0));
109102
await c.run();
110-
t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2']);
111-
112-
// delivering the same messages should not trigger sends, but the ack is new
113-
rc = mb.deliverInbound(
114-
'peer1',
115-
[
116-
[1, 'msg1'],
117-
[2, 'msg2'],
118-
],
119-
3,
120-
);
121-
t.truthy(rc);
103+
const expected = ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0'];
104+
t.deepEqual(c.dump().log, expected);
105+
106+
// all messages/acks should be delivered, even duplicates
107+
t.true(mb.deliverInbound('peer1', [m1, m2], 0));
122108
await c.run();
123-
t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']);
124-
125-
// no new messages/acks makes deliverInbound return 'false'
126-
rc = mb.deliverInbound(
127-
'peer1',
128-
[
129-
[1, 'msg1'],
130-
[2, 'msg2'],
131-
],
132-
3,
133-
);
134-
t.falsy(rc);
109+
expected.push(...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-0']);
110+
t.deepEqual(c.dump().log, expected);
111+
112+
// new messages too
113+
t.true(mb.deliverInbound('peer1', [m1, m2, m3], 0));
135114
await c.run();
136-
t.deepEqual(c.dump().log, ['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'da-peer1-3']);
137-
138-
// but new messages should be sent
139-
rc = mb.deliverInbound(
140-
'peer1',
141-
[
142-
[1, 'msg1'],
143-
[2, 'msg2'],
144-
[3, 'msg3'],
145-
],
146-
3,
115+
expected.push(
116+
...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-0'],
147117
);
148-
t.truthy(rc);
118+
t.deepEqual(c.dump().log, expected);
119+
120+
// and new ack
121+
t.true(mb.deliverInbound('peer1', [m1, m2, m3], 6));
149122
await c.run();
150-
t.deepEqual(c.dump().log, [
151-
'dm-peer1',
152-
'm-1-msg1',
153-
'm-2-msg2',
154-
'da-peer1-3',
155-
'dm-peer1',
156-
'm-3-msg3',
157-
]);
158-
159-
// and a higher ack should be sent
160-
rc = mb.deliverInbound(
161-
'peer1',
162-
[
163-
[1, 'msg1'],
164-
[2, 'msg2'],
165-
[3, 'msg3'],
166-
],
167-
4,
123+
expected.push(
124+
...['dm-peer1', 'm-1-msg1', 'm-2-msg2', 'm-3-msg3', 'da-peer1-6'],
168125
);
169-
t.truthy(rc);
170-
await c.run();
171-
t.deepEqual(c.dump().log, [
172-
'dm-peer1',
173-
'm-1-msg1',
174-
'm-2-msg2',
175-
'da-peer1-3',
176-
'dm-peer1',
177-
'm-3-msg3',
178-
'da-peer1-4',
179-
]);
180-
181-
rc = mb.deliverInbound('peer2', [[4, 'msg4']], 5);
182-
t.truthy(rc);
126+
t.deepEqual(c.dump().log, expected);
127+
});
128+
129+
async function initializeMailboxKernel(t) {
130+
const s = buildMailboxStateMap();
131+
const mb = buildMailbox(s);
132+
const config = {
133+
bootstrap: 'bootstrap',
134+
vats: {
135+
bootstrap: {
136+
bundle: t.context.data.bootstrap,
137+
},
138+
},
139+
devices: {
140+
mailbox: {
141+
sourceSpec: require.resolve(mb.srcPath),
142+
},
143+
},
144+
};
145+
const hostStorage = provideHostStorage();
146+
await initializeSwingset(
147+
config,
148+
['mailbox-determinism'],
149+
hostStorage,
150+
t.context.data,
151+
);
152+
return hostStorage;
153+
}
154+
155+
async function makeMailboxKernel(hostStorage) {
156+
const s = buildMailboxStateMap();
157+
const mb = buildMailbox(s);
158+
const deviceEndowments = {
159+
mailbox: { ...mb.endowments },
160+
};
161+
const c = await makeSwingsetController(hostStorage, deviceEndowments);
162+
c.pinVatRoot('bootstrap');
183163
await c.run();
184-
t.deepEqual(c.dump().log, [
185-
'dm-peer1',
186-
'm-1-msg1',
187-
'm-2-msg2',
188-
'da-peer1-3',
189-
'dm-peer1',
190-
'm-3-msg3',
191-
'da-peer1-4',
192-
'dm-peer2',
193-
'm-4-msg4',
194-
'da-peer2-5',
195-
]);
164+
return [c, mb];
165+
}
166+
167+
test('mailbox determinism', async t => {
168+
// we run two kernels in parallel
169+
const hostStorage1 = await initializeMailboxKernel(t);
170+
const hostStorage2 = await initializeMailboxKernel(t);
171+
const [c1a, mb1a] = await makeMailboxKernel(hostStorage1);
172+
const [c2, mb2] = await makeMailboxKernel(hostStorage2);
173+
174+
// they get the same inbound message
175+
const msg1 = [[1, 'msg1']];
176+
t.true(mb1a.deliverInbound('peer1', msg1, 0));
177+
await c1a.run();
178+
t.deepEqual(c1a.dump().log, ['comms receive msg1']);
179+
const kp1 = c1a.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
180+
await c1a.run();
181+
t.deepEqual(JSON.parse(c1a.kpResolution(kp1).body), 1);
182+
183+
t.true(mb2.deliverInbound('peer1', msg1, 0));
184+
await c2.run();
185+
t.deepEqual(c2.dump().log, ['comms receive msg1']);
186+
const kp2 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
187+
await c2.run();
188+
t.deepEqual(JSON.parse(c2.kpResolution(kp2).body), 1);
189+
190+
// both should have the same number of cranks
191+
t.is(
192+
hostStorage1.kvStore.get('crankNumber'),
193+
hostStorage2.kvStore.get('crankNumber'),
194+
);
195+
196+
// then one is restarted, but the other keeps running
197+
const [c1b, mb1b] = await makeMailboxKernel(hostStorage1);
198+
199+
// Now we repeat delivery of that message to both. The mailbox should send
200+
// it to vattp, even though it's duplicate, because the mailbox doesn't
201+
// have durable state, and cannot correctly (deterministically) tell that
202+
// it's a duplicate.
203+
t.true(mb1b.deliverInbound('peer1', msg1, 0));
204+
await c1b.run();
205+
// the testlog is part of the ephemeral kernel state, so it will only have
206+
// a record of messages in the second run, however the vat is replayed
207+
// during the second-run startup, so we expect to see one copy of the
208+
// original message, delivered during the second run
209+
t.deepEqual(c1b.dump().log, ['comms receive msg1']);
210+
// but vattp dedups, so only one message should be delivered to comms
211+
const kp3 = c1b.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
212+
await c1b.run();
213+
t.deepEqual(JSON.parse(c1b.kpResolution(kp3).body), 1);
214+
215+
t.true(mb2.deliverInbound('peer1', msg1, 0));
216+
await c2.run();
217+
// the second kernel still has that ephemeral testlog, however the vat is
218+
// still running, so we only see the original message from the first run
219+
t.deepEqual(c2.dump().log, ['comms receive msg1']);
220+
const kp4 = c2.queueToVatRoot('bootstrap', 'getNumReceived', capargs([]));
221+
await c2.run();
222+
t.deepEqual(JSON.parse(c2.kpResolution(kp4).body), 1);
223+
224+
// Both should *still* have the same number of cranks. This is what bug
225+
// #3471 exposed.
226+
t.is(
227+
hostStorage1.kvStore.get('crankNumber'),
228+
hostStorage2.kvStore.get('crankNumber'),
229+
);
196230
});

packages/SwingSet/test/test-vattp.js

+8-34
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,9 @@ test('vattp', async t => {
3232
await c.run();
3333
t.deepEqual(s.exportToData(), {});
3434

35-
t.is(
36-
mb.deliverInbound(
37-
'remote1',
38-
[
39-
[1, 'msg1'],
40-
[2, 'msg2'],
41-
],
42-
0,
43-
),
44-
true,
45-
);
35+
const m1 = [1, 'msg1'];
36+
const m2 = [2, 'msg2'];
37+
t.is(mb.deliverInbound('remote1', [m1, m2], 0), true);
4638
await c.run();
4739
t.deepEqual(c.dump().log, [
4840
'not sending anything',
@@ -51,17 +43,7 @@ test('vattp', async t => {
5143
]);
5244
t.deepEqual(s.exportToData(), { remote1: { outbox: [], inboundAck: 2 } });
5345

54-
t.is(
55-
mb.deliverInbound(
56-
'remote1',
57-
[
58-
[1, 'msg1'],
59-
[2, 'msg2'],
60-
],
61-
0,
62-
),
63-
false,
64-
);
46+
t.is(mb.deliverInbound('remote1', [m1, m2], 0), true);
6547
await c.run();
6648
t.deepEqual(s.exportToData(), { remote1: { outbox: [], inboundAck: 2 } });
6749
});
@@ -104,19 +86,11 @@ test('vattp 2', async t => {
10486
t.deepEqual(c.dump().log, ['ch.receive msg1']);
10587
t.deepEqual(s.exportToData(), { remote1: { outbox: [], inboundAck: 1 } });
10688

107-
t.is(mb.deliverInbound('remote1', [[1, 'msg1']], 1), false);
89+
t.is(mb.deliverInbound('remote1', [[1, 'msg1']], 1), true);
10890

109-
t.is(
110-
mb.deliverInbound(
111-
'remote1',
112-
[
113-
[1, 'msg1'],
114-
[2, 'msg2'],
115-
],
116-
1,
117-
),
118-
true,
119-
);
91+
const m1 = [1, 'msg1'];
92+
const m2 = [2, 'msg2'];
93+
t.is(mb.deliverInbound('remote1', [m1, m2], 1), true);
12094
await c.run();
12195
t.deepEqual(c.dump().log, ['ch.receive msg1', 'ch.receive msg2']);
12296
t.deepEqual(s.exportToData(), { remote1: { outbox: [], inboundAck: 2 } });

0 commit comments

Comments
 (0)