Skip to content

Commit 4dd076c

Browse files
authored
feat: txpool persistence (AztecProtocol#3672)
This PR adds a new implementation of a `TxPool` based on the kv-store introduced in AztecProtocol#3628. It also amends the p2p-client's bootstrap flow. The p2p-client saves the block number its on to the database and restores it the next time it is started. The initial sync happens as before (it syncs to the the tip of the chain, but instead of syncing from 0, it syncs from the last known block number) and after that, it re-publishes any transactions in its TxPool that haven't been processed already. Fix AztecProtocol#3365
1 parent 2db2e2a commit 4dd076c

21 files changed

+308
-73
lines changed

yarn-project/aztec-node/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"@aztec/circuits.js": "workspace:^",
3838
"@aztec/ethereum": "workspace:^",
3939
"@aztec/foundation": "workspace:^",
40+
"@aztec/kv-store": "workspace:^",
4041
"@aztec/l1-artifacts": "workspace:^",
4142
"@aztec/merkle-tree": "workspace:^",
4243
"@aztec/p2p": "workspace:^",

yarn-project/aztec-node/src/aztec-node/server.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import { computeGlobalsHash, computePublicDataTreeLeafSlot } from '@aztec/circui
1616
import { L1ContractAddresses, createEthereumChain } from '@aztec/ethereum';
1717
import { AztecAddress } from '@aztec/foundation/aztec-address';
1818
import { createDebugLogger } from '@aztec/foundation/log';
19-
import { InMemoryTxPool, P2P, createP2PClient } from '@aztec/p2p';
19+
import { AztecLmdbStore } from '@aztec/kv-store';
20+
import { AztecKVTxPool, P2P, createP2PClient } from '@aztec/p2p';
2021
import {
2122
GlobalVariableBuilder,
2223
PublicProcessorFactory,
@@ -105,6 +106,7 @@ export class AztecNodeService implements AztecNode {
105106
}
106107

107108
const log = createDebugLogger('aztec:node');
109+
const store = await AztecLmdbStore.create(config.l1Contracts.rollupAddress, config.dataDirectory);
108110
const [nodeDb, worldStateDb] = await openDb(config, log);
109111

110112
// first create and sync the archiver
@@ -116,7 +118,7 @@ export class AztecNodeService implements AztecNode {
116118
config.transactionProtocol = `/aztec/tx/${config.l1Contracts.rollupAddress.toString()}`;
117119

118120
// create the tx pool and the p2p client, which will need the l2 block source
119-
const p2pClient = await createP2PClient(config, new InMemoryTxPool(), archiver);
121+
const p2pClient = await createP2PClient(store, config, new AztecKVTxPool(store), archiver);
120122

121123
// now create the merkle trees and the world state synchronizer
122124
const merkleTrees = await MerkleTrees.new(worldStateDb);

yarn-project/aztec-node/tsconfig.json

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
},
3636
{
3737
"path": "../world-state"
38+
},
39+
{
40+
"path": "../kv-store"
3841
}
3942
],
4043
"include": ["src"]

yarn-project/end-to-end/src/benchmarks/bench_process_history.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ describe('benchmarks/process_history', () => {
4343
// Send enough txs to move the chain to the next block number checkpoint
4444
const txCount = (chainLength - lastBlock) * BLOCK_SIZE;
4545
const sentTxs = await sendTxs(txCount, context, contract);
46-
await sentTxs[sentTxs.length - 1].wait({ timeout: 5 * 60_000 });
46+
await Promise.all(sentTxs.map(tx => tx.wait({ timeout: 5 * 60_000 })));
4747
await sleep(100);
4848

4949
// Create a new node and measure how much time it takes it to sync

yarn-project/end-to-end/src/e2e_block_building.test.ts

+21-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import {
66
DebugLogger,
77
Fr,
88
PXE,
9+
SentTx,
10+
TxReceipt,
911
TxStatus,
1012
Wallet,
1113
isContractDeployed,
@@ -72,7 +74,7 @@ describe('e2e_block_building', () => {
7274
expect(areDeployed).toEqual(times(TX_COUNT, () => true));
7375
}, 60_000);
7476

75-
it('can call public function from different tx in same block', async () => {
77+
it.skip('can call public function from different tx in same block', async () => {
7678
// Ensure both txs will land on the same block
7779
await aztecNode.setConfig({ minTxsPerBlock: 2 });
7880

@@ -125,8 +127,7 @@ describe('e2e_block_building', () => {
125127
await call.simulate();
126128
}
127129
const [tx1, tx2] = calls.map(call => call.send());
128-
await tx1.wait();
129-
await expect(tx2.wait()).rejects.toThrowError(/dropped/);
130+
await expectXorTx(tx1, tx2);
130131
}, 30_000);
131132

132133
it('drops tx with public nullifier already emitted on the same block', async () => {
@@ -136,8 +137,7 @@ describe('e2e_block_building', () => {
136137
await call.simulate();
137138
}
138139
const [tx1, tx2] = calls.map(call => call.send());
139-
await tx1.wait();
140-
await expect(tx2.wait()).rejects.toThrowError(/dropped/);
140+
await expectXorTx(tx1, tx2);
141141
}, 30_000);
142142

143143
it('drops tx with two equal nullifiers', async () => {
@@ -160,8 +160,22 @@ describe('e2e_block_building', () => {
160160
await call.simulate();
161161
}
162162
const [tx1, tx2] = calls.map(call => call.send());
163-
await tx1.wait();
164-
await expect(tx2.wait()).rejects.toThrowError(/dropped/);
163+
await expectXorTx(tx1, tx2);
165164
});
166165
});
167166
});
167+
168+
/**
169+
* Checks that only one of the two provided transactions succeeds.
170+
* @param tx1 - A transaction.
171+
* @param tx2 - Another transaction.
172+
*/
173+
async function expectXorTx(tx1: SentTx, tx2: SentTx) {
174+
const receipts = await Promise.allSettled([tx1.wait(), tx2.wait()]);
175+
const succeeded = receipts.find((r): r is PromiseSettledResult<TxReceipt> => r.status === 'fulfilled');
176+
const failed = receipts.find((r): r is PromiseRejectedResult => r.status === 'rejected');
177+
178+
expect(succeeded).toBeDefined();
179+
expect(failed).toBeDefined();
180+
expect((failed?.reason as Error).message).toMatch(/dropped/);
181+
}

yarn-project/p2p/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"dependencies": {
3535
"@aztec/circuits.js": "workspace:^",
3636
"@aztec/foundation": "workspace:^",
37+
"@aztec/kv-store": "workspace:^",
3738
"@aztec/types": "workspace:^",
3839
"@chainsafe/libp2p-noise": "^13.0.0",
3940
"@chainsafe/libp2p-yamux": "^5.0.0",

yarn-project/p2p/src/client/index.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { AztecKVStore } from '@aztec/kv-store';
12
import { L2BlockSource } from '@aztec/types';
23

34
import { P2PClient } from '../client/p2p_client.js';
@@ -8,7 +9,12 @@ import { TxPool } from '../tx_pool/index.js';
89

910
export * from './p2p_client.js';
1011

11-
export const createP2PClient = async (config: P2PConfig, txPool: TxPool, l2BlockSource: L2BlockSource) => {
12+
export const createP2PClient = async (
13+
store: AztecKVStore,
14+
config: P2PConfig,
15+
txPool: TxPool,
16+
l2BlockSource: L2BlockSource,
17+
) => {
1218
const p2pService = config.p2pEnabled ? await LibP2PService.new(config, txPool) : new DummyP2PService();
13-
return new P2PClient(l2BlockSource, txPool, p2pService);
19+
return new P2PClient(store, l2BlockSource, txPool, p2pService);
1420
};

yarn-project/p2p/src/client/p2p_client.test.ts

+27-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { EthAddress } from '@aztec/circuits.js';
2+
import { AztecKVStore, AztecLmdbStore } from '@aztec/kv-store';
13
import { L2BlockSource, mockTx } from '@aztec/types';
24

35
import { expect, jest } from '@jest/globals';
@@ -18,8 +20,10 @@ describe('In-Memory P2P Client', () => {
1820
let txPool: Mockify<TxPool>;
1921
let blockSource: L2BlockSource;
2022
let p2pService: Mockify<P2PService>;
23+
let kvStore: AztecKVStore;
24+
let client: P2PClient;
2125

22-
beforeEach(() => {
26+
beforeEach(async () => {
2327
txPool = {
2428
addTxs: jest.fn(),
2529
getTxByHash: jest.fn().mockReturnValue(undefined),
@@ -37,10 +41,12 @@ describe('In-Memory P2P Client', () => {
3741
};
3842

3943
blockSource = new MockBlockSource();
44+
45+
kvStore = await AztecLmdbStore.create(EthAddress.random());
46+
client = new P2PClient(kvStore, blockSource, txPool, p2pService);
4047
});
4148

4249
it('can start & stop', async () => {
43-
const client = new P2PClient(blockSource, txPool, p2pService);
4450
expect(await client.isReady()).toEqual(false);
4551

4652
await client.start();
@@ -51,7 +57,6 @@ describe('In-Memory P2P Client', () => {
5157
});
5258

5359
it('adds txs to pool', async () => {
54-
const client = new P2PClient(blockSource, txPool, p2pService);
5560
await client.start();
5661
const tx1 = mockTx();
5762
const tx2 = mockTx();
@@ -63,7 +68,6 @@ describe('In-Memory P2P Client', () => {
6368
});
6469

6570
it('rejects txs after being stopped', async () => {
66-
const client = new P2PClient(blockSource, txPool, p2pService);
6771
await client.start();
6872
const tx1 = mockTx();
6973
const tx2 = mockTx();
@@ -76,4 +80,23 @@ describe('In-Memory P2P Client', () => {
7680
await expect(client.sendTx(tx3)).rejects.toThrow();
7781
expect(txPool.addTxs).toHaveBeenCalledTimes(2);
7882
});
83+
84+
it('republishes previously stored txs on start', async () => {
85+
const tx1 = mockTx();
86+
const tx2 = mockTx();
87+
txPool.getAllTxs.mockReturnValue([tx1, tx2]);
88+
89+
await client.start();
90+
expect(p2pService.propagateTx).toHaveBeenCalledTimes(2);
91+
expect(p2pService.propagateTx).toHaveBeenCalledWith(tx1);
92+
expect(p2pService.propagateTx).toHaveBeenCalledWith(tx2);
93+
});
94+
95+
it('restores the previous block number it was at', async () => {
96+
await client.start();
97+
await client.stop();
98+
99+
const client2 = new P2PClient(kvStore, blockSource, txPool, p2pService);
100+
expect(client2.getSyncedBlockNum()).toEqual(client.getSyncedBlockNum());
101+
});
79102
});

yarn-project/p2p/src/client/p2p_client.ts

+41-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
import { createDebugLogger } from '@aztec/foundation/log';
2-
import { L2Block, L2BlockContext, L2BlockDownloader, L2BlockSource, Tx, TxHash } from '@aztec/types';
2+
import { AztecKVStore, AztecSingleton } from '@aztec/kv-store';
3+
import {
4+
INITIAL_L2_BLOCK_NUM,
5+
L2Block,
6+
L2BlockContext,
7+
L2BlockDownloader,
8+
L2BlockSource,
9+
Tx,
10+
TxHash,
11+
} from '@aztec/types';
312

413
import { getP2PConfigEnvVars } from '../config.js';
514
import { P2PService } from '../service/service.js';
@@ -102,31 +111,30 @@ export class P2PClient implements P2P {
102111
*/
103112
private runningPromise!: Promise<void>;
104113

105-
/**
106-
* Store the ID of the latest block the client has synced to.
107-
*/
108-
private currentL2BlockNum = 0;
109-
110114
private currentState = P2PClientState.IDLE;
111115
private syncPromise = Promise.resolve();
112116
private latestBlockNumberAtStart = -1;
113117
private syncResolve?: () => void = undefined;
118+
private synchedBlockNumber: AztecSingleton<number>;
114119

115120
/**
116121
* In-memory P2P client constructor.
122+
* @param store - The client's instance of the KV store.
117123
* @param l2BlockSource - P2P client's source for fetching existing blocks.
118124
* @param txPool - The client's instance of a transaction pool. Defaults to in-memory implementation.
119125
* @param p2pService - The concrete instance of p2p networking to use.
120126
* @param log - A logger.
121127
*/
122128
constructor(
129+
store: AztecKVStore,
123130
private l2BlockSource: L2BlockSource,
124131
private txPool: TxPool,
125132
private p2pService: P2PService,
126133
private log = createDebugLogger('aztec:p2p'),
127134
) {
128135
const { p2pBlockCheckIntervalMS: checkInterval, l2QueueSize } = getP2PConfigEnvVars();
129136
this.blockDownloader = new L2BlockDownloader(l2BlockSource, l2QueueSize, checkInterval);
137+
this.synchedBlockNumber = store.createSingleton('p2p_pool_last_l2_block');
130138
}
131139

132140
/**
@@ -144,7 +152,7 @@ export class P2PClient implements P2P {
144152
// get the current latest block number
145153
this.latestBlockNumberAtStart = await this.l2BlockSource.getBlockNumber();
146154

147-
const blockToDownloadFrom = this.currentL2BlockNum + 1;
155+
const blockToDownloadFrom = this.getSyncedBlockNum() + 1;
148156

149157
// if there are blocks to be retrieved, go to a synching state
150158
if (blockToDownloadFrom <= this.latestBlockNumberAtStart) {
@@ -161,6 +169,9 @@ export class P2PClient implements P2P {
161169
this.log(`Next block ${blockToDownloadFrom} already beyond latest block at ${this.latestBlockNumberAtStart}`);
162170
}
163171

172+
// publish any txs in TxPool after its doing initial sync
173+
this.syncPromise = this.syncPromise.then(() => this.publishStoredTxs());
174+
164175
// start looking for further blocks
165176
const blockProcess = async () => {
166177
while (!this.stopping) {
@@ -171,6 +182,7 @@ export class P2PClient implements P2P {
171182
this.runningPromise = blockProcess();
172183
this.blockDownloader.start(blockToDownloadFrom);
173184
this.log(`Started block downloader from block ${blockToDownloadFrom}`);
185+
174186
return this.syncPromise;
175187
}
176188

@@ -229,7 +241,7 @@ export class P2PClient implements P2P {
229241
if (!ready) {
230242
throw new Error('P2P client not ready');
231243
}
232-
this.txPool.deleteTxs(txHashes);
244+
await this.txPool.deleteTxs(txHashes);
233245
}
234246

235247
/**
@@ -245,7 +257,7 @@ export class P2PClient implements P2P {
245257
* @returns Block number of latest L2 Block we've synced with.
246258
*/
247259
public getSyncedBlockNum() {
248-
return this.currentL2BlockNum;
260+
return this.synchedBlockNumber.get() ?? INITIAL_L2_BLOCK_NUM - 1;
249261
}
250262

251263
/**
@@ -255,7 +267,7 @@ export class P2PClient implements P2P {
255267
public getStatus(): Promise<P2PSyncState> {
256268
return Promise.resolve({
257269
state: this.currentState,
258-
syncedToL2Block: this.currentL2BlockNum,
270+
syncedToL2Block: this.getSyncedBlockNum(),
259271
} as P2PSyncState);
260272
}
261273

@@ -264,14 +276,13 @@ export class P2PClient implements P2P {
264276
* @param blocks - A list of existing blocks with txs that the P2P client needs to ensure the tx pool is reconciled with.
265277
* @returns Empty promise.
266278
*/
267-
private reconcileTxPool(blocks: L2Block[]): Promise<void> {
279+
private async reconcileTxPool(blocks: L2Block[]): Promise<void> {
268280
for (let i = 0; i < blocks.length; i++) {
269281
const blockContext = new L2BlockContext(blocks[i]);
270282
const txHashes = blockContext.getTxHashes();
271-
this.txPool.deleteTxs(txHashes);
283+
await this.txPool.deleteTxs(txHashes);
272284
this.p2pService.settledTxs(txHashes);
273285
}
274-
return Promise.resolve();
275286
}
276287

277288
/**
@@ -284,9 +295,11 @@ export class P2PClient implements P2P {
284295
return Promise.resolve();
285296
}
286297
await this.reconcileTxPool(blocks);
287-
this.currentL2BlockNum = blocks[blocks.length - 1].number;
288-
this.log(`Synched to block ${this.currentL2BlockNum}`);
289-
if (this.currentState === P2PClientState.SYNCHING && this.currentL2BlockNum >= this.latestBlockNumberAtStart) {
298+
const lastBlockNum = blocks[blocks.length - 1].number;
299+
await this.synchedBlockNumber.set(lastBlockNum);
300+
this.log(`Synched to block ${lastBlockNum}`);
301+
302+
if (this.currentState === P2PClientState.SYNCHING && lastBlockNum >= this.latestBlockNumberAtStart) {
290303
this.setCurrentState(P2PClientState.RUNNING);
291304
if (this.syncResolve !== undefined) {
292305
this.syncResolve();
@@ -303,4 +316,16 @@ export class P2PClient implements P2P {
303316
this.currentState = newState;
304317
this.log(`Moved to state ${P2PClientState[this.currentState]}`);
305318
}
319+
320+
private async publishStoredTxs() {
321+
if (!this.isReady()) {
322+
return;
323+
}
324+
325+
const txs = this.txPool.getAllTxs();
326+
if (txs.length > 0) {
327+
this.log(`Publishing ${txs.length} previously stored txs`);
328+
await Promise.all(txs.map(tx => this.p2pService.propagateTx(tx)));
329+
}
330+
}
306331
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { EthAddress } from '@aztec/circuits.js';
2+
import { AztecLmdbStore } from '@aztec/kv-store';
3+
4+
import { AztecKVTxPool } from './aztec_kv_tx_pool.js';
5+
import { describeTxPool } from './tx_pool_test_suite.js';
6+
7+
describe('In-Memory TX pool', () => {
8+
let txPool: AztecKVTxPool;
9+
beforeEach(async () => {
10+
txPool = new AztecKVTxPool(await AztecLmdbStore.create(EthAddress.random()));
11+
});
12+
13+
describeTxPool(() => txPool);
14+
});

0 commit comments

Comments
 (0)