Skip to content

Commit

Permalink
Merge pull request #127 from nervosnetwork/jjyr/sqlite-impl-peerstore
Browse files Browse the repository at this point in the history
refactor: use SQLite implement PeerStore to replace current MemoryPeerStore
  • Loading branch information
jjyr authored Jan 3, 2019
2 parents 7965805 + 8763d34 commit a1bdf98
Show file tree
Hide file tree
Showing 24 changed files with 1,290 additions and 1,067 deletions.
168 changes: 133 additions & 35 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,18 @@ futures = { version = "0.1.19", features = ["use_std"] }
snap = "0.2"
libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
faketime = "0.2.0"
rusqlite = {version = "0.16.0", features = ["trace"]}
lazy_static = "1.2.0"
multihash = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0" }
r2d2 = "0.8.3"
r2d2_sqlite = {git = "https://github.com/jjyr/r2d2-sqlite", rev="8ade92077977c4860562b1337b95ec8ed2000711"}

[dev-dependencies]
criterion = "0.2"
tempfile = "3.0.5"

[[bench]]
name = "sqlite_peer_store"
harness = false
path = "src/benches/sqlite_peer_store.rs"

142 changes: 142 additions & 0 deletions network/src/benches/sqlite_peer_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#[macro_use]
extern crate criterion;
extern crate ckb_network;
extern crate ckb_util;
extern crate tempfile;

use ckb_network::{
peer_store::{sqlite, PeerStore, SqlitePeerStore},
random_peer_id, Endpoint, ToMultiaddr,
};
use ckb_util::Mutex;
use criterion::Criterion;
use std::fs;
use std::rc::Rc;
use tempfile::tempdir;

fn insert_peer_info_benchmark(c: &mut Criterion) {
c.bench_function("insert 100 peer_info", |b| {
b.iter({
let mut peer_store = SqlitePeerStore::default();
let peer_ids = (0..100)
.map(|_| random_peer_id().unwrap())
.collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
for peer_id in peer_ids.clone() {
peer_store.new_connected_peer(&peer_id, addr.clone(), Endpoint::Dialer);
}
}
})
});
c.bench_function("insert 1000 peer_info", |b| {
b.iter({
let mut peer_store = SqlitePeerStore::default();
let peer_ids = (0..1000)
.map(|_| random_peer_id().unwrap())
.collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
for peer_id in peer_ids.clone() {
peer_store.new_connected_peer(&peer_id, addr.clone(), Endpoint::Dialer);
}
}
})
});

// filesystem benchmark
let dir = tempdir().expect("temp dir");
let file_path = dir.path().join("test.db").to_str().unwrap().to_string();
c.bench_function("insert 100 peer_info on filesystem", move |b| {
b.iter({
let file_path = file_path.clone();
let _ = fs::remove_file(file_path.clone());
let mut peer_store =
SqlitePeerStore::new(sqlite::open_pool(sqlite::StorePath::File(file_path), 8));
let peer_ids = (0..100)
.map(|_| random_peer_id().unwrap())
.collect::<Vec<_>>();
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
move || {
for peer_id in peer_ids.clone() {
peer_store.new_connected_peer(&peer_id, addr.clone(), Endpoint::Dialer);
}
}
})
});
}

fn random_order_benchmark(c: &mut Criterion) {
{
let peer_store = Rc::new(Mutex::new(SqlitePeerStore::default()));
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
{
let mut peer_store = peer_store.lock();
for _ in 0..8000 {
let peer_id = random_peer_id().unwrap();
peer_store.new_connected_peer(&peer_id, addr.clone(), Endpoint::Dialer);
let _ = peer_store.add_discovered_address(&peer_id, addr.clone());
}
}
c.bench_function("random order 1000 / 8000 peer_info", {
let peer_store = Rc::clone(&peer_store);
move |b| {
b.iter({
let peer_store = Rc::clone(&peer_store);
move || {
let peer_store = Rc::clone(&peer_store);
let count = 1000;
assert_eq!(
peer_store.lock().peers_to_attempt(count).len() as u32,
count
);
}
})
}
});
c.bench_function("random order 2000 / 8000 peer_info", {
let peer_store = Rc::clone(&peer_store);
move |b| {
b.iter({
let peer_store = Rc::clone(&peer_store);
move || {
let peer_store = Rc::clone(&peer_store);
let count = 2000;
assert_eq!(
peer_store.lock().peers_to_attempt(count).len() as u32,
count
);
}
})
}
});
}

// filesystem benchmark
let dir = tempdir().expect("temp dir");
let file_path = dir.path().join("test.db").to_str().unwrap().to_string();
c.bench_function(
"random order 1000 / 8000 peer_info on filesystem",
move |b| {
b.iter({
let file_path = file_path.clone();
let _ = fs::remove_file(file_path.clone());
let mut peer_store =
SqlitePeerStore::new(sqlite::open_pool(sqlite::StorePath::File(file_path), 8));
let addr = "/ip4/127.0.0.1".to_multiaddr().unwrap();
for _ in 0..8000 {
let peer_id = random_peer_id().unwrap();
peer_store.new_connected_peer(&peer_id, addr.clone(), Endpoint::Dialer);
let _ = peer_store.add_discovered_address(&peer_id, addr.clone());
}
move || {
let count = 1000;
assert_eq!(peer_store.peers_to_attempt(count).len() as u32, count);
}
})
},
);
}

criterion_group!(benches, insert_peer_info_benchmark, random_order_benchmark);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion network/src/ckb_protocol_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
// ban peer
fn ban_peer(&self, peer_index: PeerIndex, timeout: Duration) {
if let Some(peer_id) = self.network.get_peer_id(peer_index) {
self.network.ban_peer(peer_id, timeout)
self.network.ban_peer(&peer_id, timeout)
}
}
// disconnect from peer
Expand Down
19 changes: 3 additions & 16 deletions network/src/ckb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,19 @@ use faketime::unix_time_as_millis;
use futures::future::{self, Future};
use futures::Stream;
use libp2p::core::{Endpoint, Multiaddr, UniqueConnecState};
use libp2p::kad;
use log::{error, info};
use std::boxed::Box;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::sync::Arc;
use tokio;

pub struct CKBService {
// used to update kbuckets
pub kad_system: Arc<kad::KadSystem>,
}
pub struct CKBService;

impl CKBService {
fn handle_protocol_connection(
network: Arc<Network>,
peer_id: PeerId,
protocol_output: CKBProtocolOutput<Arc<CKBProtocolHandler>>,
kad_system: Arc<kad::KadSystem>,
addr: Multiaddr,
) -> Box<Future<Item = (), Error = IoError> + Send> {
let protocol_id = protocol_output.protocol_id;
Expand Down Expand Up @@ -78,10 +73,7 @@ impl CKBService {
let network = Arc::clone(&network);
let protocol_handler = Arc::clone(&protocol_handler);
let peer_id = peer_id.clone();
let kad_system = Arc::clone(&kad_system);
move |data| {
// update kad_system when we received data
kad_system.update_kbuckets(peer_id.clone());
network.modify_peer(&peer_id, |peer| {
peer.last_message_time = Some(unix_time_as_millis())
});
Expand Down Expand Up @@ -176,13 +168,8 @@ impl<T: Send> ProtocolService<T> for CKBService {
) -> Box<Future<Item = (), Error = IoError> + Send> {
match protocol {
Protocol::CKBProtocol(output, peer_id, addr) => {
let handling_future = Self::handle_protocol_connection(
network,
peer_id,
output,
Arc::clone(&self.kad_system),
addr,
);
let handling_future =
Self::handle_protocol_connection(network, peer_id, output, addr);
Box::new(handling_future) as Box<Future<Item = _, Error = _> + Send>
}
_ => Box::new(future::ok(())) as Box<Future<Item = _, Error = _> + Send>,
Expand Down
Loading

0 comments on commit a1bdf98

Please sign in to comment.