Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: request specific transactions through the p2p layer #8185

Merged
merged 18 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export const createP2PClient = async (

if (config.p2pEnabled) {
// If announceTcpAddress or announceUdpAddress are not provided, query for public IP if config allows

const {
tcpAnnounceAddress: configTcpAnnounceAddress,
udpAnnounceAddress: configUdpAnnounceAddress,
Expand Down Expand Up @@ -68,6 +69,7 @@ export const createP2PClient = async (
// Create peer discovery service
const peerId = await createLibP2PPeerId(config.peerIdPrivateKey);
const discoveryService = new DiscV5Service(peerId, config);

p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, attestationsPool, store);
} else {
p2pService = new DummyP2PService();
Expand Down
1 change: 1 addition & 0 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe('In-Memory P2P Client', () => {
stop: jest.fn(),
propagate: jest.fn(),
registerBlockReceivedCallback: jest.fn(),
sendRequest: jest.fn(),
getEnr: jest.fn(),
};

Expand Down
12 changes: 12 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { type ENR } from '@chainsafe/enr';

import { type AttestationPool } from '../attestation_pool/attestation_pool.js';
import { getP2PConfigEnvVars } from '../config.js';
import { TX_REQ_PROTOCOL } from '../service/reqresp/interface.js';
import type { P2PService } from '../service/service.js';
import { type TxPool } from '../tx_pool/index.js';

Expand Down Expand Up @@ -71,6 +72,12 @@ export interface P2P {
// ^ This pattern is not my favorite (md)
registerBlockProposalHandler(handler: (block: BlockProposal) => Promise<BlockAttestation>): void;

/**
* Request a transaction from another peer by its tx hash.
* @param txHash - Hash of the tx to query.
*/
requestTxByHash(txHash: TxHash): Promise<Tx | undefined>;

/**
* Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers.
* @param tx - The transaction.
Expand Down Expand Up @@ -276,6 +283,11 @@ export class P2PClient implements P2P {
this.p2pService.registerBlockReceivedCallback(handler);
}

public requestTxByHash(txHash: TxHash): Promise<Tx | undefined> {
// Underlying I want to use the libp2p service to just have a request method where the subprotocol is defined here
return this.p2pService.sendRequest(TX_REQ_PROTOCOL, txHash);
}

/**
* Returns all transactions in the transaction pool.
* @returns An array of Txs.
Expand Down
101 changes: 101 additions & 0 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import {
PING_PROTOCOL,
type ReqRespSubProtocolHandlers,
STATUS_PROTOCOL,
TX_REQ_PROTOCOL,
} from '../service/reqresp/interface.js';
import { ReqResp } from '../service/reqresp/reqresp.js';

/**
* Creates a libp2p node, pre configured.
* @param boostrapAddrs - an optional list of bootstrap addresses
* @returns Lip2p node
*/
export async function createLibp2pNode(boostrapAddrs: string[] = []): Promise<Libp2p> {
const options: Libp2pOptions = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'],
},
connectionEncryption: [noise()],
streamMuxers: [yamux()],
transports: [tcp()],
};

if (boostrapAddrs.length > 0) {
options.peerDiscovery = [
bootstrap({
list: boostrapAddrs,
}),
];
}

return await createLibp2p(options);
}

/**
* A p2p / req resp node pairing the req node will always contain the p2p node.
* they are provided as a pair to allow access the p2p node directly
*/
export type ReqRespNode = {
p2p: Libp2p;
req: ReqResp;
};

// Mock sub protocol handlers
export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))),
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
export const createNodes = async (numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp()));
};

// TODO: think about where else this can go
export const startNodes = async (nodes: ReqRespNode[], subProtocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS) => {
for (const node of nodes) {
await node.req.start(subProtocolHandlers);
}
};

export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
await node.req.stop();
await node.p2p.stop();
}
};

// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
return {
p2p,
req,
};
};

// Given a node list; hand shake all of the nodes with each other
export const connectToPeers = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
for (const otherNode of nodes) {
if (node === otherNode) {
continue;
}
const addr = otherNode.p2p.getMultiaddrs()[0];
await node.p2p.dial(addr);
}
}
};
18 changes: 18 additions & 0 deletions yarn-project/p2p/src/service/dummy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { BlockAttestation, BlockProposal, Gossipable, TxHash } from '@aztec
import type { PeerId } from '@libp2p/interface';
import EventEmitter from 'events';

import { type ReqRespSubProtocol, type SubProtocolMap } from './reqresp/interface.js';
import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js';

/**
Expand Down Expand Up @@ -42,6 +43,23 @@ export class DummyP2PService implements P2PService {
*/
public registerBlockReceivedCallback(_: (block: BlockProposal) => Promise<BlockAttestation>) {}

/**
* Sends a request to a peer.
* @param _protocol - The protocol to send the request on.
* @param _request - The request to send.
* @returns The response from the peer, otherwise undefined.
*/
public sendRequest<Protocol extends ReqRespSubProtocol>(
_protocol: Protocol,
_request: InstanceType<SubProtocolMap[Protocol]['request']>,
): Promise<InstanceType<SubProtocolMap[Protocol]['response']> | undefined> {
return Promise.resolve(undefined);
}

/**
* Returns the ENR of the peer.
* @returns The ENR of the peer, otherwise undefined.
*/
public getEnr(): undefined {
return undefined;
}
Expand Down
76 changes: 75 additions & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
TopicType,
TopicTypeMap,
Tx,
TxHash,
} from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
Expand All @@ -30,6 +31,18 @@ import { type TxPool } from '../tx_pool/index.js';
import { convertToMultiaddr } from '../util.js';
import { AztecDatastore } from './data_store.js';
import { PeerManager } from './peer_manager.js';
import { pingHandler, statusHandler } from './reqresp/handlers.js';
import {
DEFAULT_SUB_PROTOCOL_HANDLERS,
PING_PROTOCOL,
type ReqRespSubProtocol,
type ReqRespSubProtocolHandlers,
STATUS_PROTOCOL,
type SubProtocolMap,
TX_REQ_PROTOCOL,
subProtocolMap,
} from './reqresp/interface.js';
import { ReqResp } from './reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from './service.js';

export interface PubSubLibp2p extends Libp2p {
Expand Down Expand Up @@ -61,6 +74,14 @@ export class LibP2PService implements P2PService {
private peerManager: PeerManager;
private discoveryRunningPromise?: RunningPromise;

// Request and response sub service
private reqresp: ReqResp;

/**
* Callback for when a block is received from a peer.
* @param block - The block received from the peer.
* @returns The attestation for the block, if any.
*/
private blockReceivedCallback: (block: BlockProposal) => Promise<BlockAttestation | undefined>;

constructor(
Expand All @@ -69,9 +90,11 @@ export class LibP2PService implements P2PService {
private peerDiscoveryService: PeerDiscoveryService,
private txPool: TxPool,
private attestationPool: AttestationPool,
private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
private logger = createDebugLogger('aztec:libp2p_service'),
) {
this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);
this.reqresp = new ReqResp(node);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
Expand Down Expand Up @@ -124,6 +147,7 @@ export class LibP2PService implements P2PService {
this.peerManager.discover();
}, this.config.peerCheckIntervalMS);
this.discoveryRunningPromise.start();
await this.reqresp.start(this.requestResponseHandlers);
}

/**
Expand All @@ -140,6 +164,9 @@ export class LibP2PService implements P2PService {
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
this.logger.debug('Stopping request response service...');
await this.reqresp.stop();
this.logger.debug('Request response service stopped...');
}

/**
Expand Down Expand Up @@ -206,9 +233,56 @@ export class LibP2PService implements P2PService {
},
});

return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool);
// Create request response protocol handlers
/**
* Handler for tx requests
* @param msg - the tx request message
* @returns the tx response message
*/
const txHandler = (msg: Buffer): Promise<Uint8Array> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = txPool.getTxByHash(txHash);
const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0));
return Promise.resolve(asUint8Array);
};

const requestResponseHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: txHandler,
};

return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool, requestResponseHandlers);
}

/**
* Send Request via the ReqResp service
* The subprotocol defined will determine the request and response types
*
* See the subProtocolMap for the mapping of subprotocols to request/response types in `interface.ts`
*
* @param protocol The request response protocol to use
* @param request The request type to send
* @returns
*/
async sendRequest<SubProtocol extends ReqRespSubProtocol>(
protocol: SubProtocol,
request: InstanceType<SubProtocolMap[SubProtocol]['request']>,
): Promise<InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined> {
const pair = subProtocolMap[protocol];

const res = await this.reqresp.sendRequest(protocol, request.toBuffer());
if (!res) {
return undefined;
}

return pair.response.fromBuffer(res!);
}

/**
* Get the ENR of the node
* @returns The ENR of the node
*/
public getEnr(): ENR | undefined {
return this.peerDiscoveryService.getEnr();
}
Expand Down
18 changes: 14 additions & 4 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
export function pingHandler(_msg: any) {
return Uint8Array.from(Buffer.from('pong'));
/**
* Handles the ping request.
* @param _msg - The ping request message.
* @returns A resolved promise with the pong response.
*/
export function pingHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('pong')));
}

export function statusHandler(_msg: any) {
return Uint8Array.from(Buffer.from('ok'));
/**
* Handles the status request.
* @param _msg - The status request message.
* @returns A resolved promise with the ok response.
*/
export function statusHandler(_msg: any): Promise<Uint8Array> {
return Promise.resolve(Uint8Array.from(Buffer.from('ok')));
}
Loading
Loading