Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: request specific transactions through the p2p layer #8185

Merged
merged 18 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions yarn-project/circuit-types/src/p2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ export * from './interface.js';
export * from './gossipable.js';
export * from './topic_type.js';
export * from './signature.js';
// export * from "./respondable.js";
// export * from "./requestable.js";
33 changes: 31 additions & 2 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type L2BlockSource } from '@aztec/circuit-types';
import { type L2BlockSource, TxHash } from '@aztec/circuit-types';
import { type AztecKVStore } from '@aztec/kv-store';

import { type AttestationPool } from '../attestation_pool/attestation_pool.js';
Expand All @@ -7,6 +7,8 @@ import { type P2PConfig } from '../config.js';
import { DiscV5Service } from '../service/discV5_service.js';
import { DummyP2PService } from '../service/dummy_service.js';
import { LibP2PService, createLibP2PPeerId } from '../service/index.js';
import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import { PING_PROTOCOL, STATUS_PROTOCOL, TX_REQ_PROTOCOL } from '../service/reqresp/interface.js';
import { type TxPool } from '../tx_pool/index.js';
import { getPublicIp, resolveAddressIfNecessary, splitAddressPort } from '../util.js';

Expand All @@ -23,6 +25,9 @@ export const createP2PClient = async (

if (config.p2pEnabled) {
// If announceTcpAddress or announceUdpAddress are not provided, query for public IP if config allows

// TODO: move create libp2p2 client INTO the p2p client constructor?????
// WHat is the advantage to defining here and passing it in?
const {
tcpAnnounceAddress: configTcpAnnounceAddress,
udpAnnounceAddress: configUdpAnnounceAddress,
Expand Down Expand Up @@ -68,7 +73,31 @@ export const createP2PClient = async (
// Create peer discovery service
const peerId = await createLibP2PPeerId(config.peerIdPrivateKey);
const discoveryService = new DiscV5Service(peerId, config);
p2pService = await LibP2PService.new(config, discoveryService, peerId, txPool, attestationsPool, store);

// TODO: this must go somewhere else - AHHHHHHHHHHH - this whole thing needs a layercaking
const txHandler = (msg: Buffer): Promise<Uint8Array> => {
const txHash = TxHash.fromBuffer(msg);
const foundTx = txPool.getTxByHash(txHash);
const asUint8Array = Uint8Array.from(foundTx ? foundTx.toBuffer() : Buffer.alloc(0));
return Promise.resolve(asUint8Array);
};

const requestResponseHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: txHandler,
};

// TODO: pass the reqresp handlers in here - using callbacks for the proof of concept
p2pService = await LibP2PService.new(
config,
discoveryService,
peerId,
txPool,
attestationsPool,
store,
requestResponseHandlers,
);
} else {
p2pService = new DummyP2PService();
}
Expand Down
1 change: 1 addition & 0 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ describe('In-Memory P2P Client', () => {
stop: jest.fn(),
propagate: jest.fn(),
registerBlockReceivedCallback: jest.fn(),
sendRequest: jest.fn(),
getEnr: jest.fn(),
};

Expand Down
12 changes: 12 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { type ENR } from '@chainsafe/enr';

import { type AttestationPool } from '../attestation_pool/attestation_pool.js';
import { getP2PConfigEnvVars } from '../config.js';
import { TX_REQ_PROTOCOL } from '../service/reqresp/interface.js';
import type { P2PService } from '../service/service.js';
import { type TxPool } from '../tx_pool/index.js';

Expand Down Expand Up @@ -71,6 +72,12 @@ export interface P2P {
// ^ This pattern is not my favorite (md)
registerBlockProposalHandler(handler: (block: BlockProposal) => Promise<BlockAttestation>): void;

/**
* Request a transaction from another peer by its tx hash.
* @param txHash - Hash of the tx to query.
*/
requestTxByHash(txHash: TxHash): Promise<Tx | undefined>;

/**
* Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers.
* @param tx - The transaction.
Expand Down Expand Up @@ -276,6 +283,11 @@ export class P2PClient implements P2P {
this.p2pService.registerBlockReceivedCallback(handler);
}

public requestTxByHash(txHash: TxHash): Promise<Tx | undefined> {
// Underlying I want to use the libp2p service to just have a request method where the subprotocol is defined here
return this.p2pService.sendRequest(TX_REQ_PROTOCOL, txHash);
}

/**
* Returns all transactions in the transaction pool.
* @returns An array of Txs.
Expand Down
96 changes: 96 additions & 0 deletions yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { noise } from '@chainsafe/libp2p-noise';
import { yamux } from '@chainsafe/libp2p-yamux';
import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import { PING_PROTOCOL, STATUS_PROTOCOL, TX_REQ_PROTOCOL } from '../service/reqresp/interface.js';
import { ReqResp, type SubProtocolHandlers } from '../service/reqresp/reqresp.js';

/**
* Creates a libp2p node, pre configured.
* @param boostrapAddrs - an optional list of bootstrap addresses
* @returns Lip2p node
*/
export async function createLibp2pNode(boostrapAddrs: string[] = []): Promise<Libp2p> {
const options: Libp2pOptions = {
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'],
},
connectionEncryption: [noise()],
streamMuxers: [yamux()],
transports: [tcp()],
};

if (boostrapAddrs.length > 0) {
options.peerDiscovery = [
bootstrap({
list: boostrapAddrs,
}),
];
}

return await createLibp2p(options);
}

/**
* A p2p / req resp node pairing the req node will always contain the p2p node.
* they are provided as a pair to allow access the p2p node directly
*/
export type ReqRespNode = {
p2p: Libp2p;
req: ReqResp;
};

// handlers
export const SUB_PROTOCOL_HANDLERS: SubProtocolHandlers = {
[PING_PROTOCOL]: pingHandler,
[STATUS_PROTOCOL]: statusHandler,
[TX_REQ_PROTOCOL]: (_msg: any) => Promise.resolve(Uint8Array.from(Buffer.from('tx'))),
};

/**
* @param numberOfNodes - the number of nodes to create
* @returns An array of the created nodes
*/
export const createNodes = async (numberOfNodes: number): Promise<ReqRespNode[]> => {
return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp()));
};

// TODO: think about where else this can go
export const startNodes = async (nodes: ReqRespNode[], subProtocolHandlers = SUB_PROTOCOL_HANDLERS) => {
for (const node of nodes) {
await node.req.start(subProtocolHandlers);
}
};

export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
await node.req.stop();
await node.p2p.stop();
}
};

// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
return {
p2p,
req,
};
};

// Given a node list; hand shake all of the nodes with each other
export const connectToPeers = async (nodes: ReqRespNode[]): Promise<void> => {
for (const node of nodes) {
for (const otherNode of nodes) {
if (node === otherNode) {
continue;
}
const addr = otherNode.p2p.getMultiaddrs()[0];
await node.p2p.dial(addr);
}
}
};
18 changes: 18 additions & 0 deletions yarn-project/p2p/src/service/dummy_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { BlockAttestation, BlockProposal, Gossipable, TxHash } from '@aztec
import type { PeerId } from '@libp2p/interface';
import EventEmitter from 'events';

import { type SubProtocol, type SubProtocolMap } from './reqresp/interface.js';
import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from './service.js';

/**
Expand Down Expand Up @@ -42,6 +43,23 @@ export class DummyP2PService implements P2PService {
*/
public registerBlockReceivedCallback(_: (block: BlockProposal) => Promise<BlockAttestation>) {}

/**
* Sends a request to a peer.
* @param _protocol - The protocol to send the request on.
* @param _request - The request to send.
* @returns The response from the peer, otherwise undefined.
*/
public sendRequest<Protocol extends SubProtocol>(
_protocol: Protocol,
_request: InstanceType<SubProtocolMap[Protocol]['request']>,
): Promise<InstanceType<SubProtocolMap[Protocol]['response']> | undefined> {
return Promise.resolve(undefined);
}

/**
* Returns the ENR of the peer.
* @returns The ENR of the peer, otherwise undefined.
*/
public getEnr(): undefined {
return undefined;
}
Expand Down
43 changes: 42 additions & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import { type TxPool } from '../tx_pool/index.js';
import { convertToMultiaddr } from '../util.js';
import { AztecDatastore } from './data_store.js';
import { PeerManager } from './peer_manager.js';
import { type SubProtocol, type SubProtocolMap, subProtocolMap } from './reqresp/interface.js';
import { DEFAULT_SUB_PROTOCOL_HANDLERS, ReqResp, type SubProtocolHandlers } from './reqresp/reqresp.js';
import type { P2PService, PeerDiscoveryService } from './service.js';

export interface PubSubLibp2p extends Libp2p {
Expand Down Expand Up @@ -61,18 +63,26 @@ export class LibP2PService implements P2PService {
private peerManager: PeerManager;
private discoveryRunningPromise?: RunningPromise;

// Request and response sub service
private reqresp: ReqResp;

private blockReceivedCallback: (block: BlockProposal) => Promise<BlockAttestation | undefined>;
// private reqRespHandlers: SubProtocolHandlers | undefined;

constructor(
private config: P2PConfig,
private node: PubSubLibp2p,
private peerDiscoveryService: PeerDiscoveryService,
private txPool: TxPool,
private attestationPool: AttestationPool,
private requestResponseHandlers: SubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS,
private logger = createDebugLogger('aztec:libp2p_service'),
) {
this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);

// TODO: will handlers get passed in here?
this.reqresp = new ReqResp(node);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
`[WARNING] handler not yet registered: Block received callback not set. Received block ${block.p2pMessageIdentifier()} from peer.`,
Expand Down Expand Up @@ -124,6 +134,7 @@ export class LibP2PService implements P2PService {
this.peerManager.discover();
}, this.config.peerCheckIntervalMS);
this.discoveryRunningPromise.start();
await this.reqresp.start(this.requestResponseHandlers);
}

/**
Expand All @@ -140,6 +151,9 @@ export class LibP2PService implements P2PService {
this.logger.debug('Stopping LibP2P...');
await this.stopLibP2P();
this.logger.info('LibP2P service stopped');
this.logger.debug('Stopping request response service...');
await this.reqresp.stop();
this.logger.debug('Request response service stopped...');
}

/**
Expand All @@ -155,6 +169,7 @@ export class LibP2PService implements P2PService {
txPool: TxPool,
attestationPool: AttestationPool,
store: AztecKVStore,
requestResponseHandlers: SubProtocolHandlers,
) {
const { tcpListenAddress, tcpAnnounceAddress, minPeerCount, maxPeerCount } = config;
const bindAddrTcp = convertToMultiaddr(tcpListenAddress, 'tcp');
Expand Down Expand Up @@ -206,7 +221,28 @@ export class LibP2PService implements P2PService {
},
});

return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool);
return new LibP2PService(config, node, peerDiscoveryService, txPool, attestationPool, requestResponseHandlers);
}

/**
*
* @param protocol The request response protocol to use
* @param request The request type to send
* @returns
*/
async sendRequest<Protocol extends SubProtocol>(
protocol: Protocol,
request: InstanceType<SubProtocolMap[Protocol]['request']>,
): Promise<InstanceType<SubProtocolMap[Protocol]['response']> | undefined> {
const pair = subProtocolMap[protocol];

// TODO: Can the type be retreived from a mapping based on the subprotocol
const res = await this.reqresp.sendRequest(protocol, request.toBuffer());
if (!res) {
return undefined;
}

return pair.response.fromBuffer(res!);
}

public getEnr(): ENR | undefined {
Expand All @@ -218,6 +254,11 @@ export class LibP2PService implements P2PService {
this.logger.verbose('Block received callback registered');
}

// public registerRequestResponseHandlers(handlers: SubProtocolHandlers) {
// this.reqRespHandlers = handlers;
// this.logger.verbose("Request response protocol handlers registered");
// }

/**
* Subscribes to a topic.
* @param topic - The topic to subscribe to.
Expand Down
7 changes: 7 additions & 0 deletions yarn-project/p2p/src/service/reqresp/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function pingHandler(_msg: any) {
return Promise.resolve(Uint8Array.from(Buffer.from('pong')));
}

export function statusHandler(_msg: any) {
return Promise.resolve(Uint8Array.from(Buffer.from('ok')));
}
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/service/reqresp/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Request Response protocol allows nodes to ask their peers for data
* that they missed via the traditional gossip protocol.
*/
Loading
Loading