Skip to content

Commit

Permalink
fix: prevent multi times dialing kad connection to the same peer (#20)
Browse files Browse the repository at this point in the history
* fix: prevent multi times dialing kad connection to the same peer

* fix: remove unused line

* fix: update network log
  • Loading branch information
jjyr authored and quake committed Nov 26, 2018
1 parent 7119a40 commit 01bcaf4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 16 deletions.
78 changes: 64 additions & 14 deletions network/src/discovery_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ impl DiscoveryService {
kademlia_stream: Box<Stream<Item = kad::KadIncomingRequest, Error = IoError> + Send>,
) -> Result<Box<Future<Item = (), Error = IoError> + Send>, IoError> {
let handling_future = Box::new(
// why use loop_fn?????????????
// does client disconnect after discovery?
future::loop_fn(kademlia_stream, {
let peer_id = peer_id.clone();
let kad_system = Arc::clone(&self.kad_system);
Expand Down Expand Up @@ -179,7 +177,7 @@ impl DiscoveryService {
if let Some(to_notify) = kad_manage.to_notify.take() {
to_notify.notify();
}
kad_manage.new_kad_connection(peer_id.clone(), kad_connection_controller.clone());
kad_manage.complete_kad_connection(peer_id.clone(), kad_connection_controller.clone());
kad_manage.fetch_unique_connec(peer_id.clone())
};
Ok(Box::new(
Expand All @@ -190,11 +188,11 @@ impl DiscoveryService {
// drop kad connection
move |val| {
info!("kad exit because {:?}", val);
//kad_manage.lock().drop_connection(&peer_id);
let mut kad_manage = kad_manage.lock();
if let Some(to_notify) = kad_manage.to_notify.take() {
to_notify.notify();
}
kad_manage.drop_connection(&peer_id);
val
}
}),
Expand Down Expand Up @@ -503,29 +501,42 @@ where

let mut kad_manage = self.kad_manage.lock();
kad_manage.to_notify = Some(task::current());
trace!(target: "discovery", "handling discovery loop");
Ok(Async::NotReady)
}
}

const MAX_DIALING_COUNT: usize = 30;
const MAX_CONNECTING_COUNT: usize = 30;
const KAD_DIAL_TIMEOUT_SECS: u64 = 15;

pub(crate) struct KadManage {
connected_kad_peers: FnvHashMap<PeerId, UniqueConnec<kad::KadConnecController>>,
kad_connections: FnvHashMap<PeerId, UniqueConnec<kad::KadConnecController>>,
kad_pending_dials: FnvHashMap<PeerId, Vec<oneshot::Sender<kad::KadConnecController>>>,
kad_dialing_peers: FnvHashMap<PeerId, Instant>,
kad_upgrade: kad::KadConnecConfig,
pub(crate) to_notify: Option<task::Task>,
}

impl KadManage {
pub fn new(_network: Arc<Network>, kad_upgrade: kad::KadConnecConfig) -> Self {
KadManage {
connected_kad_peers: FnvHashMap::with_capacity_and_hasher(10, Default::default()),
kad_connections: FnvHashMap::with_capacity_and_hasher(10, Default::default()),
kad_pending_dials: FnvHashMap::with_capacity_and_hasher(10, Default::default()),
kad_dialing_peers: FnvHashMap::with_capacity_and_hasher(10, Default::default()),
kad_upgrade,
to_notify: None,
}
}

fn new_kad_connection(
// check and remove timeout dials
fn check_unused_conn(&mut self) {
let now = Instant::now();
let timeout = Duration::from_secs(KAD_DIAL_TIMEOUT_SECS);
self.kad_dialing_peers
.retain(move |_peer_id, added_at| now.duration_since(*added_at) > timeout);
}

fn complete_kad_connection(
&mut self,
peer_id: PeerId,
kad_connection_controller: kad::KadConnecController,
Expand All @@ -537,10 +548,24 @@ impl KadManage {
let _ = tx.send(kad_connection_controller.clone());
}
}
// remove dialing status
self.kad_dialing_peers.remove(&peer_id);
}

fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
self.kad_connections
.iter()
.filter_map(|(key, unique_connec)| {
if unique_connec.is_alive() {
Some(key)
} else {
None
}
})
}

fn fetch_unique_connec(&mut self, peer_id: PeerId) -> UniqueConnec<kad::KadConnecController> {
self.connected_kad_peers
self.kad_connections
.entry(peer_id)
.or_insert_with(UniqueConnec::empty)
.to_owned()
Expand All @@ -553,7 +578,7 @@ impl KadManage {
addr: &Multiaddr,
transport: Tran,
swarm_controller: &SwarmController<St, Box<Future<Item = (), Error = IoError> + Send>>,
) -> Result<UniqueConnec<kad::KadConnecController>, IoError>
) -> Result<(), IoError>
where
Tran: MuxedTransport<Output = TransportOutput<To>> + Clone + Send + 'static,
Tran::MultiaddrFuture: Send + 'static,
Expand All @@ -574,8 +599,34 @@ impl KadManage {
debug!(target: "discovery", "dial kad connection to {:?} {:?}", peer_id, addr);
let kad_connection = self.fetch_unique_connec(peer_id.clone());
if kad_connection.is_alive() {
return Ok(kad_connection);
return Ok(());
}

// remove unused connections
self.check_unused_conn();
// check peer
if self.kad_dialing_peers.contains_key(&peer_id)
|| self.kad_dialing_peers.len() >= MAX_DIALING_COUNT
{
debug!(target: "discovery", "we are already dialing to {:?} {:?}", peer_id, addr);
return Ok(());
}
let is_connected = self
.kad_connections
.get(&peer_id)
.map(|unique_connec| unique_connec.is_alive())
.unwrap_or(false);
let count_of_connected_peers = self.connected_peers().collect::<Vec<_>>().len();
if is_connected || count_of_connected_peers >= MAX_CONNECTING_COUNT {
debug!(target: "discovery", "we are already connected to {:?} {:?}", peer_id, addr);
// should return a error?
return Ok(());
}
//set peer to dialing list
self.kad_dialing_peers
.entry(peer_id.clone())
.or_insert_with(Instant::now);

let kad_upgrade = self.kad_upgrade.clone();
let transport = transport
.and_then(move |out, endpoint, client_addr| {
Expand All @@ -598,12 +649,11 @@ impl KadManage {
});

let _ = kad_connection.dial(swarm_controller, addr, transport);
Ok(kad_connection)
Ok(())
}

#[allow(dead_code)]
fn drop_connection(&mut self, peer_id: &PeerId) {
debug!(target: "discovery","disconnect kad connection from {:?}", peer_id);
self.connected_kad_peers.remove(peer_id);
self.kad_connections.remove(peer_id);
}
}
4 changes: 2 additions & 2 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,10 @@ impl Network {
let expected_peer_id = expected_peer_id.clone();
move |(peer_id, protocol), _, client_addr| {
if peer_id == expected_peer_id {
trace!("dial success to {:?}", peer_id);
debug!(target: "network", "success connect to {:?}", peer_id);
future::ok((protocol, client_addr))
} else {
trace!("dial peer id mismatch {:?}", peer_id);
debug!(target: "network", "connected peer id mismatch {:?}, disconnect!", peer_id);
//Because multiaddrs is responsed by a third-part node, the mismatched
//peer itself should not seems as a misbehaviour peer.
//So we do not report this behaviour
Expand Down

0 comments on commit 01bcaf4

Please sign in to comment.