diff --git a/Cargo.lock b/Cargo.lock index 99a134df93..12b8db7920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -828,6 +828,7 @@ dependencies = [ "lru-cache 0.1.0 (git+https://github.com/nervosnetwork/lru-cache?rev=a35fdb8)", "numext-fixed-hash 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "numext-fixed-uint 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "sentry 0.15.5 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/sync/Cargo.toml b/sync/Cargo.toml index e33787490f..2a48c57852 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -42,3 +42,4 @@ ckb-db = { path = "../db" } env_logger = "0.6" crossbeam-channel = "0.3" test-chain-utils = { path = "../util/test-chain-utils" } +rand = "0.6" diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index 14a8bf729a..290323ca99 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -168,13 +168,17 @@ where let mut inflight = self.synchronizer.peers.blocks_inflight.write(); let count = MAX_BLOCKS_IN_TRANSIT_PER_PEER .saturating_sub(inflight.peer_inflight_count(&self.peer)); + let max_height_header = self + .synchronizer + .shared + .get_ancestor(&best_known_header.hash(), max_height)?; while index_height < max_height && fetch.len() < count { index_height += 1; let to_fetch = self .synchronizer .shared - .get_ancestor(&best_known_header.hash(), index_height)?; + .get_ancestor(max_height_header.hash(), index_height)?; let to_fetch_hash = to_fetch.hash(); let block_status = self.synchronizer.get_block_status(to_fetch_hash); diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 8ec22304ac..39af43d139 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -39,11 +39,14 @@ use std::sync::Arc; use std::time::{Duration, Instant}; pub const SEND_GET_HEADERS_TOKEN: u64 = 0; -pub const BLOCK_FETCH_TOKEN: u64 = 1; -pub const TIMEOUT_EVICTION_TOKEN: u64 = 2; +pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1; +pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2; +pub const TIMEOUT_EVICTION_TOKEN: u64 = 3; pub const NO_PEER_CHECK_TOKEN: u64 = 255; + const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200); -const BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(400); +const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40); +const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); bitflags! { pub struct BlockStatus: u32 { @@ -506,7 +509,8 @@ impl CKBProtocolHandler for Synchronizer { // NOTE: 100ms is what bitcoin use. nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN); nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN); - nc.set_notify(BLOCK_FETCH_INTERVAL, BLOCK_FETCH_TOKEN); + nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN); + nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN); nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN); } @@ -568,8 +572,15 @@ impl CKBProtocolHandler for Synchronizer { SEND_GET_HEADERS_TOKEN => { self.start_sync_headers(nc.as_ref()); } - BLOCK_FETCH_TOKEN => { - self.find_blocks_to_fetch(nc.as_ref()); + IBD_BLOCK_FETCH_TOKEN => { + if self.shared.is_initial_block_download() { + self.find_blocks_to_fetch(nc.as_ref()); + } + } + NOT_IBD_BLOCK_FETCH_TOKEN => { + if !self.shared.is_initial_block_download() { + self.find_blocks_to_fetch(nc.as_ref()); + } } TIMEOUT_EVICTION_TOKEN => { self.eviction(nc.as_ref()); diff --git a/sync/src/tests/synchronizer.rs b/sync/src/tests/synchronizer.rs index ab118766a0..194950c757 100644 --- a/sync/src/tests/synchronizer.rs +++ b/sync/src/tests/synchronizer.rs @@ -1,4 +1,7 @@ -use crate::synchronizer::{BLOCK_FETCH_TOKEN, SEND_GET_HEADERS_TOKEN, TIMEOUT_EVICTION_TOKEN}; +use crate::synchronizer::{ + IBD_BLOCK_FETCH_TOKEN, NOT_IBD_BLOCK_FETCH_TOKEN, SEND_GET_HEADERS_TOKEN, + TIMEOUT_EVICTION_TOKEN, +}; use crate::tests::TestNode; use crate::{Config, NetworkProtocol, SyncSharedState, Synchronizer}; use ckb_chain::chain::ChainService; @@ -143,7 +146,8 @@ fn setup_node( &protocol, &[ SEND_GET_HEADERS_TOKEN, - BLOCK_FETCH_TOKEN, + IBD_BLOCK_FETCH_TOKEN, + NOT_IBD_BLOCK_FETCH_TOKEN, TIMEOUT_EVICTION_TOKEN, ], ); diff --git a/sync/src/types.rs b/sync/src/types.rs index 1d826471f3..02700a9e3e 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -469,6 +469,8 @@ pub struct HeaderView { inner: Header, total_difficulty: U256, total_uncles_count: u64, + // pointer to the index of some further predecessor of this block + skip_hash: Option, } impl HeaderView { @@ -477,6 +479,7 @@ impl HeaderView { inner, total_difficulty, total_uncles_count, + skip_hash: None, } } @@ -488,6 +491,10 @@ impl HeaderView { self.inner.hash() } + pub fn parent_hash(&self) -> &H256 { + self.inner.parent_hash() + } + pub fn timestamp(&self) -> u64 { self.inner.timestamp() } @@ -507,6 +514,69 @@ impl HeaderView { pub fn into_inner(self) -> Header { self.inner } + + pub fn build_skip(&mut self, mut get_header_view: F) + where + F: FnMut(&H256) -> Option, + { + self.skip_hash = get_header_view(self.parent_hash()) + .and_then(|parent| parent.get_ancestor(get_skip_height(self.number()), get_header_view)) + .map(|header| header.hash().clone()); + } + + // NOTE: get_header_view may change source state, for cache or for tests + pub fn get_ancestor(self, number: BlockNumber, mut get_header_view: F) -> Option
+ where + F: FnMut(&H256) -> Option, + { + let mut current = self; + if number > current.number() { + return None; + } + let mut number_walk = current.number(); + while number_walk > number { + let number_skip = get_skip_height(number_walk); + let number_skip_prev = get_skip_height(number_walk - 1); + match current.skip_hash { + Some(ref hash) + if number_skip == number + || (number_skip > number + && !(number_skip_prev + 2 < number_skip + && number_skip_prev >= number)) => + { + // Only follow skip if parent->skip isn't better than skip->parent + current = get_header_view(hash)?; + number_walk = number_skip; + } + _ => { + current = get_header_view(current.parent_hash())?; + number_walk -= 1; + } + } + } + Some(current.clone()).map(HeaderView::into_inner) + } +} + +// Compute what height to jump back to with the skip pointer. +fn get_skip_height(height: BlockNumber) -> BlockNumber { + // Turn the lowest '1' bit in the binary representation of a number into a '0'. + fn invert_lowest_one(n: i64) -> i64 { + n & (n - 1) + } + + if height < 2 { + return 0; + } + + // Determine which height to jump back to. Any number strictly lower than height is acceptable, + // but the following expression seems to perform well in simulations (max 110 steps to go back + // up to 2**18 blocks). + if (height & 1) > 0 { + invert_lowest_one(invert_lowest_one(height as i64 - 1)) as u64 + 1 + } else { + invert_lowest_one(height as i64) as u64 + } } #[derive(Default)] @@ -600,8 +670,9 @@ impl SyncSharedState { *self.best_known_header.write() = header; } - pub fn insert_header_view(&self, hash: H256, header: HeaderView) { - self.header_map.write().insert(hash, header); + pub fn insert_header_view(&self, hash: H256, mut view: HeaderView) { + view.build_skip(|hash| self.get_header_view(hash)); + self.header_map.write().insert(hash, view); } pub fn remove_header_view(&self, hash: &H256) { self.header_map.write().remove(hash); @@ -662,24 +733,8 @@ impl SyncSharedState { } pub fn get_ancestor(&self, base: &H256, number: BlockNumber) -> Option
{ - if let Some(header) = self.get_header(base) { - let mut n_number = header.number(); - let mut index_walk = header; - if number > n_number { - return None; - } - - while n_number > number { - if let Some(header) = self.get_header(&index_walk.parent_hash()) { - index_walk = header; - n_number -= 1; - } else { - return None; - } - } - return Some(index_walk); - } - None + self.get_header_view(base)? + .get_ancestor(number, |hash| self.get_header_view(hash)) } pub fn get_locator(&self, start: &Header) -> Vec { @@ -844,3 +899,82 @@ impl SyncSharedState { ); } } + +#[cfg(test)] +mod tests { + use super::*; + + use ckb_core::header::HeaderBuilder; + use rand::{thread_rng, Rng}; + + const SKIPLIST_LENGTH: u64 = 500_000; + + #[test] + fn test_get_ancestor_use_skip_list() { + let mut header_map: HashMap = HashMap::default(); + let mut hashes: BTreeMap = BTreeMap::default(); + + let mut parent_hash = None; + for number in 0..SKIPLIST_LENGTH { + let mut header_builder = HeaderBuilder::default().number(number); + if let Some(parent_hash) = parent_hash.take() { + header_builder = header_builder.parent_hash(parent_hash); + } + let header = header_builder.build(); + hashes.insert(number, header.hash().clone()); + parent_hash = Some(header.hash().clone()); + + let mut view = HeaderView::new(header, U256::zero(), 0); + view.build_skip(|hash| header_map.get(hash).cloned()); + header_map.insert(view.hash().clone(), view); + } + + for (number, hash) in &hashes { + if *number > 0 { + let skip_view = header_map + .get(hash) + .and_then(|view| header_map.get(view.skip_hash.as_ref().unwrap())) + .unwrap(); + assert_eq!(Some(skip_view.hash()), hashes.get(&skip_view.number())); + assert!(skip_view.number() < *number); + } else { + assert!(header_map[hash].skip_hash.is_none()); + } + } + + let mut rng = thread_rng(); + let a_to_b = |a, b, limit| { + let mut count = 0; + let header = header_map + .get(&hashes[&a]) + .cloned() + .unwrap() + .get_ancestor(b, |hash| { + count += 1; + header_map.get(hash).cloned() + }) + .unwrap(); + + // Search must finished in steps + assert!(count <= limit); + + header + }; + for _ in 0..1000 { + let from: u64 = rng.gen_range(0, SKIPLIST_LENGTH); + let to: u64 = rng.gen_range(0, from); + let view_from = &header_map[&hashes[&from]]; + let view_to = &header_map[&hashes[&to]]; + let view_0 = &header_map[&hashes[&0]]; + + let found_from_header = a_to_b(SKIPLIST_LENGTH - 1, from, 120); + assert_eq!(found_from_header.hash(), view_from.hash()); + + let found_to_header = a_to_b(from, to, 120); + assert_eq!(found_to_header.hash(), view_to.hash()); + + let found_0_header = a_to_b(from, 0, 120); + assert_eq!(found_0_header.hash(), view_0.hash()); + } + } +}