Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(sync): Fix get ancestor performance issue #970

Merged
merged 5 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ ckb-db = { path = "../db" }
env_logger = "0.6"
crossbeam-channel = "0.3"
test-chain-utils = { path = "../util/test-chain-utils" }
rand = "0.6"
6 changes: 5 additions & 1 deletion sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,17 @@ where
let mut inflight = self.synchronizer.peers.blocks_inflight.write();
let count = MAX_BLOCKS_IN_TRANSIT_PER_PEER
.saturating_sub(inflight.peer_inflight_count(&self.peer));
let max_height_header = self
.synchronizer
.shared
.get_ancestor(&best_known_header.hash(), max_height)?;

while index_height < max_height && fetch.len() < count {
index_height += 1;
let to_fetch = self
.synchronizer
.shared
.get_ancestor(&best_known_header.hash(), index_height)?;
.get_ancestor(max_height_header.hash(), index_height)?;
let to_fetch_hash = to_fetch.hash();

let block_status = self.synchronizer.get_block_status(to_fetch_hash);
Expand Down
23 changes: 17 additions & 6 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
pub const BLOCK_FETCH_TOKEN: u64 = 1;
pub const TIMEOUT_EVICTION_TOKEN: u64 = 2;
pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
pub const NO_PEER_CHECK_TOKEN: u64 = 255;

const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200);
const BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(400);
const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);

bitflags! {
pub struct BlockStatus: u32 {
Expand Down Expand Up @@ -506,7 +509,8 @@ impl<CS: ChainStore> CKBProtocolHandler for Synchronizer<CS> {
// NOTE: 100ms is what bitcoin use.
nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN);
nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN);
nc.set_notify(BLOCK_FETCH_INTERVAL, BLOCK_FETCH_TOKEN);
nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN);
nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN);
nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN);
}

Expand Down Expand Up @@ -568,8 +572,15 @@ impl<CS: ChainStore> CKBProtocolHandler for Synchronizer<CS> {
SEND_GET_HEADERS_TOKEN => {
self.start_sync_headers(nc.as_ref());
}
BLOCK_FETCH_TOKEN => {
self.find_blocks_to_fetch(nc.as_ref());
IBD_BLOCK_FETCH_TOKEN => {
if self.shared.is_initial_block_download() {
self.find_blocks_to_fetch(nc.as_ref());
}
}
NOT_IBD_BLOCK_FETCH_TOKEN => {
if !self.shared.is_initial_block_download() {
self.find_blocks_to_fetch(nc.as_ref());
}
}
TIMEOUT_EVICTION_TOKEN => {
self.eviction(nc.as_ref());
Expand Down
8 changes: 6 additions & 2 deletions sync/src/tests/synchronizer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::synchronizer::{BLOCK_FETCH_TOKEN, SEND_GET_HEADERS_TOKEN, TIMEOUT_EVICTION_TOKEN};
use crate::synchronizer::{
IBD_BLOCK_FETCH_TOKEN, NOT_IBD_BLOCK_FETCH_TOKEN, SEND_GET_HEADERS_TOKEN,
TIMEOUT_EVICTION_TOKEN,
};
use crate::tests::TestNode;
use crate::{Config, NetworkProtocol, SyncSharedState, Synchronizer};
use ckb_chain::chain::ChainService;
Expand Down Expand Up @@ -143,7 +146,8 @@ fn setup_node(
&protocol,
&[
SEND_GET_HEADERS_TOKEN,
BLOCK_FETCH_TOKEN,
IBD_BLOCK_FETCH_TOKEN,
NOT_IBD_BLOCK_FETCH_TOKEN,
TIMEOUT_EVICTION_TOKEN,
],
);
Expand Down
174 changes: 154 additions & 20 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ pub struct HeaderView {
inner: Header,
total_difficulty: U256,
total_uncles_count: u64,
// pointer to the index of some further predecessor of this block
skip_hash: Option<H256>,
}

impl HeaderView {
Expand All @@ -477,6 +479,7 @@ impl HeaderView {
inner,
total_difficulty,
total_uncles_count,
skip_hash: None,
}
}

Expand All @@ -488,6 +491,10 @@ impl HeaderView {
self.inner.hash()
}

pub fn parent_hash(&self) -> &H256 {
self.inner.parent_hash()
}

pub fn timestamp(&self) -> u64 {
self.inner.timestamp()
}
Expand All @@ -507,6 +514,69 @@ impl HeaderView {
pub fn into_inner(self) -> Header {
self.inner
}

pub fn build_skip<F>(&mut self, mut get_header_view: F)
where
F: FnMut(&H256) -> Option<HeaderView>,
{
self.skip_hash = get_header_view(self.parent_hash())
.and_then(|parent| parent.get_ancestor(get_skip_height(self.number()), get_header_view))
.map(|header| header.hash().clone());
}

// NOTE: get_header_view may change source state, for cache or for tests
pub fn get_ancestor<F>(self, number: BlockNumber, mut get_header_view: F) -> Option<Header>
where
F: FnMut(&H256) -> Option<HeaderView>,
{
let mut current = self;
if number > current.number() {
return None;
}
let mut number_walk = current.number();
while number_walk > number {
let number_skip = get_skip_height(number_walk);
let number_skip_prev = get_skip_height(number_walk - 1);
match current.skip_hash {
Some(ref hash)
if number_skip == number
|| (number_skip > number
&& !(number_skip_prev + 2 < number_skip
&& number_skip_prev >= number)) =>
{
// Only follow skip if parent->skip isn't better than skip->parent
current = get_header_view(hash)?;
number_walk = number_skip;
}
_ => {
current = get_header_view(current.parent_hash())?;
number_walk -= 1;
}
}
}
Some(current.clone()).map(HeaderView::into_inner)
}
}

// Compute what height to jump back to with the skip pointer.
fn get_skip_height(height: BlockNumber) -> BlockNumber {
// Turn the lowest '1' bit in the binary representation of a number into a '0'.
fn invert_lowest_one(n: i64) -> i64 {
n & (n - 1)
}

if height < 2 {
return 0;
}

// Determine which height to jump back to. Any number strictly lower than height is acceptable,
// but the following expression seems to perform well in simulations (max 110 steps to go back
// up to 2**18 blocks).
if (height & 1) > 0 {
invert_lowest_one(invert_lowest_one(height as i64 - 1)) as u64 + 1
} else {
invert_lowest_one(height as i64) as u64
}
}

#[derive(Default)]
Expand Down Expand Up @@ -600,8 +670,9 @@ impl<CS: ChainStore> SyncSharedState<CS> {
*self.best_known_header.write() = header;
}

pub fn insert_header_view(&self, hash: H256, header: HeaderView) {
self.header_map.write().insert(hash, header);
pub fn insert_header_view(&self, hash: H256, mut view: HeaderView) {
view.build_skip(|hash| self.get_header_view(hash));
self.header_map.write().insert(hash, view);
}
pub fn remove_header_view(&self, hash: &H256) {
self.header_map.write().remove(hash);
Expand Down Expand Up @@ -662,24 +733,8 @@ impl<CS: ChainStore> SyncSharedState<CS> {
}

pub fn get_ancestor(&self, base: &H256, number: BlockNumber) -> Option<Header> {
if let Some(header) = self.get_header(base) {
let mut n_number = header.number();
let mut index_walk = header;
if number > n_number {
return None;
}

while n_number > number {
if let Some(header) = self.get_header(&index_walk.parent_hash()) {
index_walk = header;
n_number -= 1;
} else {
return None;
}
}
return Some(index_walk);
}
None
self.get_header_view(base)?
.get_ancestor(number, |hash| self.get_header_view(hash))
}

pub fn get_locator(&self, start: &Header) -> Vec<H256> {
Expand Down Expand Up @@ -844,3 +899,82 @@ impl<CS: ChainStore> SyncSharedState<CS> {
);
}
}

#[cfg(test)]
mod tests {
use super::*;

use ckb_core::header::HeaderBuilder;
use rand::{thread_rng, Rng};

const SKIPLIST_LENGTH: u64 = 500_000;

#[test]
fn test_get_ancestor_use_skip_list() {
let mut header_map: HashMap<H256, HeaderView> = HashMap::default();
let mut hashes: BTreeMap<BlockNumber, H256> = BTreeMap::default();

let mut parent_hash = None;
for number in 0..SKIPLIST_LENGTH {
let mut header_builder = HeaderBuilder::default().number(number);
if let Some(parent_hash) = parent_hash.take() {
header_builder = header_builder.parent_hash(parent_hash);
}
let header = header_builder.build();
hashes.insert(number, header.hash().clone());
parent_hash = Some(header.hash().clone());

let mut view = HeaderView::new(header, U256::zero(), 0);
view.build_skip(|hash| header_map.get(hash).cloned());
header_map.insert(view.hash().clone(), view);
}

for (number, hash) in &hashes {
if *number > 0 {
let skip_view = header_map
.get(hash)
.and_then(|view| header_map.get(view.skip_hash.as_ref().unwrap()))
.unwrap();
assert_eq!(Some(skip_view.hash()), hashes.get(&skip_view.number()));
assert!(skip_view.number() < *number);
} else {
assert!(header_map[hash].skip_hash.is_none());
}
}

let mut rng = thread_rng();
let a_to_b = |a, b, limit| {
let mut count = 0;
let header = header_map
.get(&hashes[&a])
.cloned()
.unwrap()
.get_ancestor(b, |hash| {
count += 1;
header_map.get(hash).cloned()
})
.unwrap();

// Search must finished in <limit> steps
assert!(count <= limit);

header
};
for _ in 0..1000 {
let from: u64 = rng.gen_range(0, SKIPLIST_LENGTH);
let to: u64 = rng.gen_range(0, from);
let view_from = &header_map[&hashes[&from]];
let view_to = &header_map[&hashes[&to]];
let view_0 = &header_map[&hashes[&0]];

let found_from_header = a_to_b(SKIPLIST_LENGTH - 1, from, 120);
assert_eq!(found_from_header.hash(), view_from.hash());

let found_to_header = a_to_b(from, to, 120);
assert_eq!(found_to_header.hash(), view_to.hash());

let found_0_header = a_to_b(from, 0, 120);
assert_eq!(found_0_header.hash(), view_0.hash());
}
}
}