Skip to content

Commit

Permalink
quic: primary-to-primary (MystenLabs#668)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill authored Sep 2, 2022
1 parent f6a129c commit b76487d
Show file tree
Hide file tree
Showing 21 changed files with 834 additions and 416 deletions.
4 changes: 4 additions & 0 deletions narwhal/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ tokio-util = { version = "0.7.3", features = ["codec"] }
tonic = { version = "0.7.2", features = ["tls"] }
tracing = "0.1.36"
types = { path = "../types" }
crypto = { path = "../crypto" }

serde = "1.0.144"
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "9deac015fbf66e24b6da9699630e06750eaa094a" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
eyre = "0.6.8"

anemo = { git = "https://github.com/mystenlabs/anemo.git", rev = "6278d0fa78147a49ff2cb9dd2e45e763886be0a0" }
anyhow = "1.0.62"

[dev-dependencies]
bincode = "1.3.3"
test_utils = { path = "../test_utils" }
26 changes: 25 additions & 1 deletion narwhal/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ pub use crate::{
bounded_executor::BoundedExecutor,
primary::{PrimaryNetwork, PrimaryToWorkerNetwork},
retry::RetryConfig,
traits::{LuckyNetwork, ReliableNetwork, UnreliableNetwork},
traits::{
Lucky, LuckyNetwork, LuckyNetwork2, ReliableNetwork, ReliableNetwork2, UnreliableNetwork,
UnreliableNetwork2,
},
worker::{WorkerNetwork, WorkerToPrimaryNetwork},
};

Expand Down Expand Up @@ -58,3 +61,24 @@ impl<T> std::future::Future for CancelOnDropHandler<T> {
// The exact number here probably isn't important, the key things is that it should be finite so
// that we don't create unbounded numbers of tasks.
pub const MAX_TASK_CONCURRENCY: usize = 500;

pub fn multiaddr_to_address(
multiaddr: &multiaddr::Multiaddr,
) -> anyhow::Result<anemo::types::Address> {
use multiaddr::Protocol;
let mut iter = multiaddr.iter();

match (iter.next(), iter.next()) {
(Some(Protocol::Ip4(ipaddr)), Some(Protocol::Tcp(port) | Protocol::Udp(port))) => {
Ok((ipaddr, port).into())
}
(Some(Protocol::Ip6(ipaddr)), Some(Protocol::Tcp(port) | Protocol::Udp(port))) => {
Ok((ipaddr, port).into())
}
(Some(Protocol::Dns(hostname)), Some(Protocol::Tcp(port) | Protocol::Udp(port))) => {
Ok((hostname.as_ref(), port).into())
}

_ => Err(anyhow::anyhow!("invalid address")),
}
}
280 changes: 119 additions & 161 deletions narwhal/network/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,182 +3,26 @@

use crate::{
metrics::{Metrics, PrimaryNetworkMetrics},
traits::{BaseNetwork, LuckyNetwork, ReliableNetwork, UnreliableNetwork},
BoundedExecutor, CancelOnDropHandler, MessageResult, RetryConfig, MAX_TASK_CONCURRENCY,
traits::{BaseNetwork, Lucky, ReliableNetwork2, UnreliableNetwork, UnreliableNetwork2},
BoundedExecutor, CancelOnDropHandler, RetryConfig, MAX_TASK_CONCURRENCY,
};
use anemo::PeerId;
use async_trait::async_trait;
use crypto::PublicKey;
use multiaddr::Multiaddr;
use rand::{rngs::SmallRng, SeedableRng as _};
use std::collections::HashMap;
use tokio::{runtime::Handle, task::JoinHandle};
use tonic::{transport::Channel, Code};
use tracing::error;
use tonic::transport::Channel;
use types::{
BincodeEncodedPayload, PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient,
PrimaryWorkerMessage,
};

pub struct PrimaryNetwork {
clients: HashMap<Multiaddr, PrimaryToPrimaryClient<Channel>>,
config: mysten_network::config::Config,
retry_config: RetryConfig,
/// Small RNG just used to shuffle nodes and randomize connections (not crypto related).
rng: SmallRng,
// One bounded executor per address
executors: HashMap<Multiaddr, BoundedExecutor>,
metrics: Option<Metrics<PrimaryNetworkMetrics>>,
}

fn default_executor() -> BoundedExecutor {
BoundedExecutor::new(MAX_TASK_CONCURRENCY, Handle::current())
}

impl Default for PrimaryNetwork {
fn default() -> Self {
let retry_config = RetryConfig {
// Retry for forever
retrying_max_elapsed_time: None,
..Default::default()
};

Self {
clients: Default::default(),
config: Default::default(),
retry_config,
rng: SmallRng::from_entropy(),
executors: HashMap::new(),
metrics: None,
}
}
}

impl PrimaryNetwork {
pub fn new(metrics: Metrics<PrimaryNetworkMetrics>) -> Self {
Self {
metrics: Some(Metrics::from(metrics, "primary".to_string())),
..Default::default()
}
}

fn update_metrics(&self) {
if let Some(m) = &self.metrics {
for (addr, executor) in &self.executors {
m.set_network_available_tasks(
executor.available_capacity() as i64,
Some(addr.to_string()),
);
}
}
}

pub fn cleanup<'a, I>(&mut self, to_remove: I)
where
I: IntoIterator<Item = &'a Multiaddr>,
{
for address in to_remove {
self.clients.remove(address);
}
}
}

impl BaseNetwork for PrimaryNetwork {
type Client = PrimaryToPrimaryClient<Channel>;

type Message = PrimaryMessage;

fn client(&mut self, address: Multiaddr) -> Self::Client {
self.clients
.entry(address.clone())
.or_insert_with(|| Self::create_client(&self.config, address))
.clone()
}

fn create_client(config: &mysten_network::config::Config, address: Multiaddr) -> Self::Client {
//TODO don't panic here if address isn't supported
let channel = config.connect_lazy(&address).unwrap();
PrimaryToPrimaryClient::new(channel)
}
}

#[async_trait]
impl UnreliableNetwork for PrimaryNetwork {
async fn unreliable_send_message(
&mut self,
address: Multiaddr,
message: BincodeEncodedPayload,
) -> JoinHandle<()> {
let mut client = self.client(address.clone());
let handle = self
.executors
.entry(address)
.or_insert_with(default_executor)
.spawn(async move {
let _ = client.send_message(message).await;
})
.await;

self.update_metrics();

handle
}
}

#[async_trait]
impl LuckyNetwork for PrimaryNetwork {
fn rng(&mut self) -> &mut SmallRng {
&mut self.rng
}
}

#[async_trait]
impl ReliableNetwork for PrimaryNetwork {
// Safety
// Since this spawns an unbounded task, this should be called in a time-restricted fashion.
// Here the callers are [`PrimaryNetwork::broadcast`] and [`PrimaryNetwork::send`],
// at respectively N and K calls per round.
// (where N is the number of primaries, K the number of workers for this primary)
// See the TODO on spawn_with_retries for lifting this restriction.
async fn send_message(
&mut self,
address: Multiaddr,
message: BincodeEncodedPayload,
) -> CancelOnDropHandler<MessageResult> {
let client = self.client(address.clone());

let message_send = move || {
let mut client = client.clone();
let message = message.clone();

async move {
client.send_message(message).await.map_err(|e| {
match e.code() {
Code::FailedPrecondition | Code::InvalidArgument => {
// these errors are not recoverable through retrying, see
// https://github.com/hyperium/tonic/blob/master/tonic/src/status.rs
error!("Irrecoverable network error: {e}");
backoff::Error::permanent(eyre::Report::from(e))
}
_ => {
// this returns a backoff::Error::Transient
// so that if tonic::Status is returned, we retry
Into::<backoff::Error<eyre::Report>>::into(eyre::Report::from(e))
}
}
})
}
};

let handle = self
.executors
.entry(address)
.or_insert_with(default_executor)
.spawn_with_retries(self.retry_config, message_send);

self.update_metrics();

CancelOnDropHandler(handle)
}
}
pub struct PrimaryToWorkerNetwork {
clients: HashMap<Multiaddr, PrimaryToWorkerClient<Channel>>,
config: mysten_network::config::Config,
Expand Down Expand Up @@ -263,3 +107,117 @@ impl UnreliableNetwork for PrimaryToWorkerNetwork {
handler
}
}

pub struct PrimaryNetwork {
network: anemo::Network,
retry_config: RetryConfig,
/// Small RNG just used to shuffle nodes and randomize connections (not crypto related).
rng: SmallRng,
// One bounded executor per address
executors: HashMap<anemo::PeerId, BoundedExecutor>,
}

impl PrimaryNetwork {
pub fn new(network: anemo::Network) -> Self {
let retry_config = RetryConfig {
// Retry forever
retrying_max_elapsed_time: None,
..Default::default()
};

Self {
network,
retry_config,
rng: SmallRng::from_entropy(),
executors: HashMap::new(),
}
}

pub fn cleanup<'a, I>(&mut self, _to_remove: I)
where
I: IntoIterator<Item = &'a Multiaddr>,
{
// TODO This function was previously used to remove old clients on epoch changes. This may
// not be necessary with the new networking stack so we'll need to revisit if this function
// is even needed. For now do nothing.
}
}

#[async_trait]
impl UnreliableNetwork2<PrimaryMessage> for PrimaryNetwork {
async fn unreliable_send(
&mut self,
peer: PublicKey,
message: &PrimaryMessage,
) -> JoinHandle<()> {
let network = self.network.clone();
let peer_id = PeerId(peer.0.to_bytes());
let message = message.to_owned();
self.executors
.entry(peer_id)
.or_insert_with(default_executor)
.spawn(async move {
if let Some(peer) = network.peer(peer_id) {
let _ = PrimaryToPrimaryClient::new(peer)
.send_message(message)
.await;
}
})
.await
}
}

impl Lucky for PrimaryNetwork {
fn rng(&mut self) -> &mut SmallRng {
&mut self.rng
}
}

#[async_trait]
impl ReliableNetwork2<PrimaryMessage> for PrimaryNetwork {
async fn send(
&mut self,
peer: PublicKey,
message: &PrimaryMessage,
) -> CancelOnDropHandler<anyhow::Result<anemo::Response<()>>> {
// Safety
// Since this spawns an unbounded task, this should be called in a time-restricted fashion.
// Here the callers are [`PrimaryNetwork::broadcast`] and [`PrimaryNetwork::send`],
// at respectively N and K calls per round.
// (where N is the number of primaries, K the number of workers for this primary)
// See the TODO on spawn_with_retries for lifting this restriction.

let network = self.network.clone();
let peer_id = PeerId(peer.0.to_bytes());
let message = message.to_owned();
let message_send = move || {
let network = network.clone();
let message = message.clone();

async move {
if let Some(peer) = network.peer(peer_id) {
PrimaryToPrimaryClient::new(peer)
.send_message(message)
.await
.map_err(|e| {
// this returns a backoff::Error::Transient
// so that if anemo::Status is returned, we retry
backoff::Error::transient(anyhow::anyhow!("RPC error: {e:?}"))
})
} else {
Err(backoff::Error::transient(anyhow::anyhow!(
"not connected to peer {peer_id}"
)))
}
}
};

let handle = self
.executors
.entry(peer_id)
.or_insert_with(default_executor)
.spawn_with_retries(self.retry_config, message_send);

CancelOnDropHandler(handle)
}
}
Loading

0 comments on commit b76487d

Please sign in to comment.