From 52441dff48444543a2e8b0ae039178d227565d2f Mon Sep 17 00:00:00 2001 From: jjy Date: Thu, 15 Nov 2018 20:59:30 +0800 Subject: [PATCH] feat(network): use snappy to compress data in ckb protocol --- Cargo.lock | 15 +++++++++-- network/Cargo.toml | 1 + network/src/ckb_protocol.rs | 50 +++++++++++++++++++++++++++++-------- network/src/lib.rs | 1 + 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c899d7b957..2049af9df8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,6 +493,7 @@ dependencies = [ "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.80 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.79 (registry+https://github.com/rust-lang/crates.io-index)", + "snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "unsigned-varint 0.2.1 (git+https://github.com/paritytech/unsigned-varint)", ] @@ -774,7 +775,7 @@ dependencies = [ "clicolors-control 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "termios 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "unicode-width 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1313,7 +1314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "console 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2733,6 +2734,15 @@ dependencies = [ "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "snap" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "stable_deref_trait" version = "1.1.1" @@ -3704,6 +3714,7 @@ dependencies = [ "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum smallvec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1347484b6f8bc4b32a9323d9800b6d934376391002ad9c528cc659fe8afc08ee" "checksum smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "153ffa32fd170e9944f7e0838edf824a754ec4c1fc64746fcc9fe1f8fa602e5d" +"checksum snap 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "95d697d63d44ad8b78b8d235bf85b34022a78af292c8918527c5f0cffdde7f43" "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum stdweb 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ef5430c8e36b713e13b48a9f709cc21e046723fe44ce34587b73a830203b533e" "checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" diff --git a/network/Cargo.toml b/network/Cargo.toml index 63ff0fea3b..b6bd1fb51d 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -15,4 +15,5 @@ log = "0.4.5" bytes = "0.4.9" tokio = "0.1.8" futures = { version = "0.1.19", features = ["use_std"] } +snap = "0.2" libp2p = { git = "https://github.com/libp2p/rust-libp2p", rev="cfdfca1a06fb2deb9ebcc15a63d715ebddb23bd0", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] } diff --git a/network/src/ckb_protocol.rs b/network/src/ckb_protocol.rs index e60b5ced23..2ad382180b 100644 --- a/network/src/ckb_protocol.rs +++ b/network/src/ckb_protocol.rs @@ -1,10 +1,14 @@ #![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] use super::{Error, ProtocolId}; +use bytes::BufMut; +use bytes::{Buf, IntoBuf}; use bytes::{Bytes, BytesMut}; use futures::sync::mpsc; use futures::{future, stream, Future, Sink, Stream}; use libp2p::core::{ConnectionUpgrade, Endpoint, Multiaddr}; +use snap; +use std::io; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::string::ToString; use std::vec::IntoIter as VecIntoIter; @@ -170,23 +174,49 @@ impl CKBProtocol { Some(stream.into_future().map_err(|(err, _)| err).and_then( move |(message, stream)| match message { - Some(Message::Recv(data)) => { - if data.is_empty() { + Some(Message::Recv(compressed_data)) => { + if compressed_data.is_empty() { debug!("receive a empty message, ignoring"); let f = future::ok((None, (sink, stream, false))); return future::Either::A(f); } - - let out = Some(data.freeze()); - let f = future::ok((out, (sink, stream, false))); - future::Either::A(f) + // decompress data + let mut decompresser = snap::Reader::new(compressed_data.freeze().into_buf().reader()); + let mut data = vec![].writer(); + match io::copy(&mut decompresser, &mut data) { + Ok(_) => { + let out = Some(data.into_inner().into()); + let f = future::ok((out, (sink, stream, false))); + future::Either::A(f) + }, + Err(e) => { + future::Either::A(future::err(e)) + } + } } Some(Message::SendData(data)) => { - let fut = sink - .send(data) - .map(move |sink| (None, (sink, stream, false))); - future::Either::B(fut) + let mut compressed_data = vec![].writer(); + let mut compresser = snap::Writer::new(compressed_data); + let mut data_buf = data.into_buf(); + match io::copy(&mut data_buf.reader(), &mut compresser) { + Ok(_) => { + match compresser.into_inner() { + Ok(compressed_data) => { + let compressed_data : Bytes = compressed_data.into_inner().into(); + let fut = sink + .send(compressed_data) + .map(move |sink| (None, (sink, stream, false))); + future::Either::B(fut) + }, + Err(e) => { + future::Either::A(future::err(IoError::new(IoErrorKind::Other, format!("compressed data error {}", e.to_string())))) + } + }} + Err(e) => { + future::Either::A(future::err(e)) + } + } } Some(Message::Finished) | None => { diff --git a/network/src/lib.rs b/network/src/lib.rs index d5e1d4873b..1be0a3163c 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -3,6 +3,7 @@ extern crate bytes; extern crate futures; extern crate libp2p; extern crate rand; +extern crate snap; extern crate tokio; extern crate unsigned_varint; #[macro_use]