Skip to content

Commit d6c40e9

Browse files
committed
feat(notifier): subscribeLatest iterators retry when broken by vat upgrade
subscribeEach iterators continue to fail in that scenario, because they cannot guarantee absence of gaps. Fixes #5185
1 parent 0e49c36 commit d6c40e9

File tree

4 files changed

+166
-42
lines changed

4 files changed

+166
-42
lines changed

packages/notifier/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"@agoric/vat-data": "^0.4.3",
4242
"@endo/far": "^0.2.14",
4343
"@endo/marshal": "^0.8.1",
44+
"@endo/nat": "^4.1.0",
4445
"@endo/promise-kit": "^0.2.52"
4546
},
4647
"devDependencies": {

packages/notifier/src/subscribe.js

+51-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,48 @@
11
import { E, Far } from '@endo/far';
2+
import { isObject } from '@endo/marshal';
3+
import { isNat } from '@endo/nat';
24

35
import './types-ambient.js';
46

57
const sink = () => {};
68

9+
/**
10+
* Check the promise returned by a function for rejection by vat upgrade,
11+
* and refetch upon encountering that condition.
12+
*
13+
* @template T
14+
* @param {() => ERef<T>} thunk
15+
* @returns {Promise<T>}
16+
*/
17+
const reconnectAsNeeded = async thunk => {
18+
let lastVersion;
19+
// End synchronous prelude.
20+
await null;
21+
for (;;) {
22+
try {
23+
// eslint-disable-next-line no-await-in-loop, @jessie.js/no-nested-await
24+
const result = await thunk();
25+
return result;
26+
} catch (err) {
27+
/** @see processUpgrade in {@link ../../SwingSet/src/kernel/kernel.js} */
28+
if (isObject(err) && err.name === 'vatUpgraded') {
29+
const { incarnationNumber: version } = err;
30+
if (
31+
isNat(version) &&
32+
(lastVersion === undefined || version > lastVersion)
33+
) {
34+
// We don't expect another upgrade in between receiving
35+
// a disconnection and re-requesting an update, but must
36+
// nevertheless be prepared for that.
37+
lastVersion = version;
38+
continue;
39+
}
40+
}
41+
throw err;
42+
}
43+
}
44+
};
45+
746
/**
847
* Create a near iterable that corresponds to a potentially far one.
948
*
@@ -53,7 +92,10 @@ const makeEachIterator = pubList => {
5392
* provides "prefix lossy" iterations of the underlying PublicationList.
5493
* By "prefix lossy", we mean that you may miss everything published before
5594
* you ask the returned iterable for an iterator. But the returned iterator
56-
* will enumerate each thing published from that iterator's starting point.
95+
* will enumerate each thing published from that iterator's starting point
96+
* up to a disconnection result indicating upgrade of the producer
97+
* (which breaks the gap-free guarantee and therefore terminates any active
98+
* iterator while still supporting creation of new iterators).
5799
*
58100
* If the underlying PublicationList is terminated, that terminal value will be
59101
* reported losslessly.
@@ -64,7 +106,7 @@ const makeEachIterator = pubList => {
64106
export const subscribeEach = topic => {
65107
const iterable = Far('EachIterable', {
66108
[Symbol.asyncIterator]: () => {
67-
const pubList = E(topic).subscribeAfter();
109+
const pubList = reconnectAsNeeded(() => E(topic).subscribeAfter());
68110
return makeEachIterator(pubList);
69111
},
70112
});
@@ -95,9 +137,10 @@ const cloneLatestIterator = (topic, localUpdateCount, terminalResult) => {
95137
return terminalResult;
96138
}
97139

98-
// Send the next request now, skipping past intermediate updates.
99-
const { value, updateCount } = await E(topic).getUpdateSince(
100-
localUpdateCount,
140+
// Send the next request now, skipping past intermediate updates
141+
// and upgrade disconnections.
142+
const { value, updateCount } = await reconnectAsNeeded(() =>
143+
E(topic).getUpdateSince(localUpdateCount),
101144
);
102145
// Make sure the next request is for a fresher value.
103146
localUpdateCount = updateCount;
@@ -161,8 +204,9 @@ const makeLatestIterator = topic => cloneLatestIterator(topic);
161204
* By "lossy", we mean that you may miss any published state if a more
162205
* recent published state can be reported instead.
163206
*
164-
* If the underlying PublicationList is terminated, that terminal value will be
165-
* reported losslessly.
207+
* If the underlying PublicationList is terminated by upgrade of the producer,
208+
* it will be re-requested. All other terminal values will be losslessly
209+
* propagated.
166210
*
167211
* @template T
168212
* @param {ERef<LatestTopic<T>>} topic

packages/notifier/test/test-publish-kit.js

+107-34
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,16 @@ const assertTransmission = async (t, publishKit, value, method = 'publish') => {
5050
}
5151
};
5252

53-
const assertCells = (t, label, cells, publishCount, result, options = {}) => {
54-
const { strict = true } = options;
53+
const assertCells = (t, label, cells, publishCount, expected, options = {}) => {
54+
const { strict = true, iterationResults = {} } = options;
5555
const firstCell = cells[0];
5656
t.deepEqual(
5757
Reflect.ownKeys(firstCell).sort(),
5858
['head', 'publishCount', 'tail'],
5959
`${label} cell property keys`,
6060
);
61-
t.deepEqual(firstCell.head, result, `${label} cell result`);
62-
t.is(firstCell.head.value, result.value, `${label} cell value`);
61+
t.deepEqual(firstCell.head, expected, `${label} cell result`);
62+
t.is(firstCell.head.value, expected.value, `${label} cell value`);
6363
// `publishCount` values *should* be considered opaque,
6464
// but de facto they are a gap-free sequence of bigints
6565
// that starts at 1.
@@ -78,6 +78,10 @@ const assertCells = (t, label, cells, publishCount, result, options = {}) => {
7878
t.like(cell, props, `${label} cell ${i + 1} must match cell 0`);
7979
});
8080
}
81+
82+
for (const [resultLabel, result] of Object.entries(iterationResults)) {
83+
t.deepEqual(result, expected, `${label} ${resultLabel} result`);
84+
}
8185
};
8286

8387
// eslint-disable-next-line no-shadow
@@ -270,13 +274,31 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
270274
vatParameters: { version: 'v1' },
271275
},
272276
]);
277+
await run('createVat', [
278+
{
279+
name: 'pubsub2',
280+
bundleCapName: 'pubsub',
281+
},
282+
]);
273283
t.is(
274284
await run('messageVat', [{ name: 'pubsub', methodName: 'getVersion' }]),
275285
'v1',
276286
);
277287
const sub1 = await run('messageVat', [
278288
{ name: 'pubsub', methodName: 'getSubscriber' },
279289
]);
290+
const eachIterable = await run('messageVat', [
291+
{ name: 'pubsub2', methodName: 'subscribeEach', args: [sub1] },
292+
]);
293+
const eachIterator1 = await run('messageVatObject', [
294+
{ presence: eachIterable, methodName: Symbol.asyncIterator },
295+
]);
296+
const latestIterable = await run('messageVat', [
297+
{ name: 'pubsub2', methodName: 'subscribeLatest', args: [sub1] },
298+
]);
299+
const latestIterator1 = await run('messageVatObject', [
300+
{ presence: latestIterable, methodName: Symbol.asyncIterator },
301+
]);
280302

281303
/**
282304
* Advances the publisher.
@@ -290,17 +312,39 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
290312
]);
291313
};
292314

293-
// Verify receipt of a published value.
315+
// Verify receipt of a published value via subscribeAfter
316+
// and async iterators.
294317
const value1 = Symbol.for('value1');
295318
await publish(value1);
319+
const expectedV1FirstResult = { value: value1, done: false };
296320
const v1FirstCell = await run('messageVatObject', [
297321
{ presence: sub1, methodName: 'subscribeAfter' },
298322
]);
299-
assertCells(t, 'v1 first', [v1FirstCell], 1n, { value: value1, done: false });
300-
301-
// Verify receipt of a second published value via tail and subscribeAfter.
323+
assertCells(t, 'v1 first', [v1FirstCell], 1n, expectedV1FirstResult);
324+
const eachIteratorFirstResult = await run('messageVatObject', [
325+
{ presence: eachIterator1, methodName: 'next' },
326+
]);
327+
t.deepEqual(
328+
eachIteratorFirstResult,
329+
expectedV1FirstResult,
330+
'v1 eachIterator first result',
331+
);
332+
// Don't ask the latest iterator for its first result so we can observe
333+
// that it skips intermediate results.
334+
// const latestIteratorFirstResult = await run('messageVatObject', [
335+
// { presence: latestIterator1, methodName: 'next' },
336+
// ]);
337+
// t.deepEqual(
338+
// latestIteratorFirstResult,
339+
// expectedV1FirstResult,
340+
// 'v1 latestIterator first result',
341+
// );
342+
343+
// Verify receipt of a second published value via tail and subscribeAfter
344+
// and async iterators.
302345
const value2 = Symbol.for('value2');
303346
await publish(value2);
347+
const expectedV1SecondResult = { value: value2, done: false };
304348
await run('messageVatObject', [
305349
{ presence: sub1, methodName: 'subscribeAfter' },
306350
]);
@@ -313,14 +357,18 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
313357
{ presence: sub1, methodName: 'subscribeAfter' },
314358
]),
315359
];
316-
assertCells(
317-
t,
318-
'v1 second',
319-
v1SecondCells,
320-
2n,
321-
{ value: value2, done: false },
322-
{ strict: false },
323-
);
360+
const v1SecondIterationResults = {
361+
eachIterator: await run('messageVatObject', [
362+
{ presence: eachIterator1, methodName: 'next' },
363+
]),
364+
latestIterator: await run('messageVatObject', [
365+
{ presence: latestIterator1, methodName: 'next' },
366+
]),
367+
};
368+
assertCells(t, 'v1 second', v1SecondCells, 2n, expectedV1SecondResult, {
369+
strict: false,
370+
iterationResults: v1SecondIterationResults,
371+
});
324372

325373
// Upgrade the vat, breaking promises from v1.
326374
await run('upgradeVat', [
@@ -337,26 +385,47 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
337385
const sub2 = await run('messageVat', [
338386
{ name: 'pubsub', methodName: 'getSubscriber' },
339387
]);
340-
await run('awaitVatObject', [{ presence: v1SecondCells[0].tail }]).then(
341-
(...args) =>
342-
t.deepEqual(args, undefined, 'tail promise of old vat must be rejected'),
343-
failure =>
344-
t.deepEqual(failure, {
345-
incarnationNumber: 1,
346-
name: 'vatUpgraded',
347-
upgradeMessage: 'vat upgraded',
348-
}),
388+
const eachIterator2 = await run('messageVatObject', [
389+
{ presence: eachIterable, methodName: Symbol.asyncIterator },
390+
]);
391+
const assertDisconnection = (p, label) => {
392+
const expected = {
393+
incarnationNumber: 1,
394+
name: 'vatUpgraded',
395+
upgradeMessage: 'vat upgraded',
396+
};
397+
return p.then(
398+
(...args) => t.deepEqual(args, undefined, `${label} must be rejected`),
399+
failure =>
400+
t.deepEqual(failure, expected, `${label} must indicate disconnection`),
401+
);
402+
};
403+
await assertDisconnection(
404+
run('awaitVatObject', [{ presence: v1SecondCells[0].tail }]),
405+
'tail promise of old vat',
406+
);
407+
await assertDisconnection(
408+
run('messageVatObject', [{ presence: eachIterator1, methodName: 'next' }]),
409+
'eachIterator following old vat subscriber',
349410
);
350411

351412
// Verify receipt of the last published value from v1.
352413
const v2FirstCell = await run('messageVatObject', [
353414
{ presence: sub2, methodName: 'subscribeAfter' },
354415
]);
355-
assertCells(t, 'v2 first', [v2FirstCell], 2n, { value: value2, done: false });
416+
const v2FirstIterationResults = {
417+
eachIterator: await run('messageVatObject', [
418+
{ presence: eachIterator2, methodName: 'next' },
419+
]),
420+
};
421+
assertCells(t, 'v2 first', [v2FirstCell], 2n, expectedV1SecondResult, {
422+
iterationResults: v2FirstIterationResults,
423+
});
356424

357425
// Verify receipt of a published value from v2.
358426
const value3 = Symbol.for('value3');
359427
await publish(value3);
428+
const expectedV2SecondResult = { value: value3, done: false };
360429
const v2SecondCells = [
361430
await run('awaitVatObject', [{ presence: v2FirstCell.tail }]),
362431
await run('messageVatObject', [
@@ -366,14 +435,18 @@ test('durable publish kit upgrade trauma (full-vat integration)', async t => {
366435
{ presence: sub2, methodName: 'subscribeAfter' },
367436
]),
368437
];
369-
assertCells(
370-
t,
371-
'v2 second',
372-
v2SecondCells,
373-
3n,
374-
{ value: value3, done: false },
375-
{ strict: false },
376-
);
438+
const v2SecondIterationResults = {
439+
eachIterator: await run('messageVatObject', [
440+
{ presence: eachIterator2, methodName: 'next' },
441+
]),
442+
latestIterator: await run('messageVatObject', [
443+
{ presence: latestIterator1, methodName: 'next' },
444+
]),
445+
};
446+
assertCells(t, 'v2 second', v2SecondCells, 3n, expectedV2SecondResult, {
447+
strict: false,
448+
iterationResults: v2SecondIterationResults,
449+
});
377450
});
378451

379452
// TODO: Find a way to test virtual object rehydration

packages/notifier/test/vat-integration/vat-pubsub.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { Far } from '@endo/marshal';
22
import { provide } from '@agoric/vat-data';
3-
import { prepareDurablePublishKit } from '../../src/index.js';
3+
import {
4+
prepareDurablePublishKit,
5+
subscribeEach,
6+
subscribeLatest,
7+
} from '../../src/index.js';
48

59
export const buildRootObject = (_vatPowers, vatParameters, baggage) => {
610
const makeDurablePublishKit = prepareDurablePublishKit(
@@ -19,6 +23,8 @@ export const buildRootObject = (_vatPowers, vatParameters, baggage) => {
1923
getVersion: () => version,
2024
getParameters: () => vatParameters,
2125
getSubscriber: () => subscriber,
26+
subscribeEach: topic => subscribeEach(topic),
27+
subscribeLatest: topic => subscribeLatest(topic),
2228
makeDurablePublishKit: (...args) => makeDurablePublishKit(...args),
2329
publish: value => publisher.publish(value),
2430
finish: finalValue => publisher.finish(finalValue),

0 commit comments

Comments
 (0)