Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Shadow tracking #11689

Merged
merged 5 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,25 @@ impl EpochManagerAdapter for MockEpochManager {
Ok(true)
}

fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
// This `unwrap` here tests that in all code paths we check that the epoch exists before
// we check if we care about a shard. Please do not remove the unwrap, fix the logic of
// the calling function.
let epoch_valset = self.get_valset_for_epoch(&epoch_id).unwrap();
let chunk_producers = self.get_chunk_producers(epoch_valset, shard_id);
for validator in chunk_producers {
if validator.account_id() == account_id {
return Ok(true);
}
}
Ok(false)
}

fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down
20 changes: 18 additions & 2 deletions chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use near_primitives::block::Tip;
use near_primitives::block_header::{Approval, ApprovalInner, BlockHeader};
use near_primitives::epoch_manager::block_info::BlockInfo;
use near_primitives::epoch_manager::epoch_info::EpochInfo;
use near_primitives::epoch_manager::EpochConfig;
use near_primitives::epoch_manager::ShardConfig;
use near_primitives::epoch_manager::{EpochConfig, ShardConfig};
use near_primitives::errors::EpochError;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout, ShardLayoutError};
Expand Down Expand Up @@ -422,6 +421,13 @@ pub trait EpochManagerAdapter: Send + Sync {
partial_witness: &PartialEncodedStateWitness,
) -> Result<bool, Error>;

fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError>;

fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down Expand Up @@ -1142,4 +1148,14 @@ impl EpochManagerAdapter for EpochManagerHandle {
let epoch_manager = self.read();
Ok(epoch_manager.get_epoch_info(epoch_id)?.validators_iter().collect::<Vec<_>>())
}

fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let epoch_manager = self.read();
epoch_manager.cares_about_shard_in_epoch(epoch_id, account_id, shard_id)
}
}
38 changes: 19 additions & 19 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,25 @@ impl EpochManager {
self.get_epoch_info(&epoch_id)
}

pub fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let epoch_info = self.get_epoch_info(&epoch_id)?;
let chunk_producers_settlement = epoch_info.chunk_producers_settlement();
let chunk_producers = chunk_producers_settlement
.get(shard_id as usize)
.ok_or_else(|| EpochError::ShardingError(format!("invalid shard id {shard_id}")))?;
for validator_id in chunk_producers.iter() {
if epoch_info.validator_account_id(*validator_id) == account_id {
return Ok(true);
}
}
Ok(false)
}

pub fn cares_about_shard_from_prev_block(
&self,
parent_hash: &CryptoHash,
Expand Down Expand Up @@ -1630,25 +1649,6 @@ impl EpochManager {

/// Private utilities for EpochManager.
impl EpochManager {
fn cares_about_shard_in_epoch(
&self,
epoch_id: EpochId,
account_id: &AccountId,
shard_id: ShardId,
) -> Result<bool, EpochError> {
let epoch_info = self.get_epoch_info(&epoch_id)?;
let chunk_producers_settlement = epoch_info.chunk_producers_settlement();
let chunk_producers = chunk_producers_settlement
.get(shard_id as usize)
.ok_or_else(|| EpochError::ShardingError(format!("invalid shard id {shard_id}")))?;
for validator_id in chunk_producers.iter() {
if epoch_info.validator_account_id(*validator_id) == account_id {
return Ok(true);
}
}
Ok(false)
}

#[inline]
pub(crate) fn block_producer_from_info(
epoch_info: &EpochInfo,
Expand Down
11 changes: 10 additions & 1 deletion chain/epoch-manager/src/shard_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ use near_primitives::types::{AccountId, EpochId, ShardId};

#[derive(Clone)]
pub enum TrackedConfig {
/// Tracks shards that contain one of the given account.
Accounts(Vec<AccountId>),
/// Tracks shards that are assigned to given validator account.
ShadowValidator(AccountId),
/// Tracks all shards.
AllShards,
// Rotates between sets of shards to track.
/// Rotates between sets of shards to track.
Schedule(Vec<Vec<ShardId>>),
}

Expand All @@ -26,6 +30,8 @@ impl TrackedConfig {
TrackedConfig::AllShards
} else if !config.tracked_shard_schedule.is_empty() {
TrackedConfig::Schedule(config.tracked_shard_schedule.clone())
} else if let Some(account_id) = config.tracked_shadow_validator.as_ref() {
TrackedConfig::ShadowValidator(account_id.clone())
} else {
TrackedConfig::Accounts(config.tracked_accounts.clone())
}
Expand Down Expand Up @@ -90,6 +96,9 @@ impl ShardTracker {
let subset = &schedule[index as usize];
Ok(subset.contains(&shard_id))
}
TrackedConfig::ShadowValidator(account_id) => {
self.epoch_manager.cares_about_shard_in_epoch(*epoch_id, account_id, shard_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also give some warning somewhere (potentially here) if the accountid is not eligible to be a chunk validator as all? I was thinking about when loading the config but at that point I am not sure if the node will have the full info about the other validators.

Copy link
Contributor Author

@staffik staffik Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging it here is spammy: resulted in thousands of logs in a very short nayduck test.

}
}
}

Expand Down
3 changes: 3 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ pub struct ClientConfig {
pub gc: GCConfig,
/// Accounts that this client tracks.
pub tracked_accounts: Vec<AccountId>,
/// Track shards that should be tracked by given validator.
pub tracked_shadow_validator: Option<AccountId>,
/// Shards that this client tracks.
pub tracked_shards: Vec<ShardId>,
/// Rotate between these sets of tracked shards.
Expand Down Expand Up @@ -539,6 +541,7 @@ impl ClientConfig {
block_header_fetch_horizon: 50,
gc: GCConfig { gc_blocks_limit: 100, ..GCConfig::default() },
tracked_accounts: vec![],
tracked_shadow_validator: None,
tracked_shards: vec![],
tracked_shard_schedule: vec![],
archive,
Expand Down
3 changes: 3 additions & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ pub struct Config {
pub network: near_network::config_json::Config,
pub consensus: Consensus,
pub tracked_accounts: Vec<AccountId>,
pub tracked_shadow_validator: Option<AccountId>,
pub tracked_shards: Vec<ShardId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tracked_shard_schedule: Option<Vec<Vec<ShardId>>>,
Expand Down Expand Up @@ -337,6 +338,7 @@ impl Default for Config {
network: Default::default(),
consensus: Consensus::default(),
tracked_accounts: vec![],
tracked_shadow_validator: None,
tracked_shards: vec![],
tracked_shard_schedule: None,
archive: false,
Expand Down Expand Up @@ -557,6 +559,7 @@ impl NearConfig {
doosmslug_step_period: config.consensus.doomslug_step_period,
tracked_accounts: config.tracked_accounts,
tracked_shards: config.tracked_shards,
tracked_shadow_validator: config.tracked_shadow_validator,
tracked_shard_schedule: config.tracked_shard_schedule.unwrap_or(vec![]),
archive: config.archive,
save_trie_changes: config.save_trie_changes.unwrap_or(!config.archive),
Expand Down
4 changes: 4 additions & 0 deletions nearcore/src/config_duration_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::str::FromStr;

use crate::config::Config;
use near_jsonrpc::RpcConfig;
use near_network::config_json::{ExperimentalConfig, NetworkConfigOverrides};
use near_o11y::testonly::init_test_logger;
use near_primitives::types::AccountId;
use near_store::StoreConfig;
use serde::ser::{
SerializeMap, SerializeSeq, SerializeStruct, SerializeStructVariant, SerializeTuple,
Expand Down Expand Up @@ -40,6 +43,7 @@ fn test_config_duration_all_std() {
rosetta_rpc: Some(Default::default()),
save_trie_changes: Some(Default::default()),
split_storage: Some(Default::default()),
tracked_shadow_validator: Some(AccountId::from_str("test").unwrap()),
tracked_shard_schedule: Some(Default::default()),
transaction_pool_size_limit: Some(Default::default()),
state_sync: Some(Default::default()),
Expand Down
2 changes: 2 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pytest --timeout=240 sanity/validator_switch_key.py
pytest --timeout=240 sanity/validator_switch_key.py --features nightly
pytest --timeout=120 sanity/validator_switch_key_quick.py
pytest --timeout=120 sanity/validator_switch_key_quick.py --features nightly
pytest --timeout=60 sanity/shadow_tracking.py
pytest --timeout=60 sanity/shadow_tracking.py --features nightly
pytest sanity/proxy_simple.py
pytest sanity/proxy_simple.py --features nightly
pytest sanity/proxy_restart.py
Expand Down
108 changes: 108 additions & 0 deletions pytest/tests/sanity/shadow_tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
# Starts two validating nodes, one RPC node, and one dumper node.
# Set the RPC node to shadow track one of the validators.
# Stop the RPC node for 1 epoch so that shard assignment changes.
# Restart the RPC node, wait for state sync.
# Ensure RPC node has chunks for the shards it supposed to track as shadow validator.
# Wait for 1 epoch so that shard assignment changes and do the check again, repeat 3 times.

import unittest
import sys
import pathlib

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
from cluster import start_cluster
import state_sync_lib
from utils import wait_for_blocks

EPOCH_LENGTH = 10
TIMEOUT = 100


class ShadowTrackingTest(unittest.TestCase):

def _get_final_block_height(self, nodes):
height_per_node = [node.get_latest_block().height for node in nodes]
min_height = min(height_per_node)
max_height = max(height_per_node)
self.assertGreaterEqual(min_height + 1, max_height, height_per_node)
return min_height

def _get_block_hash(self, block_height, node):
result = node.get_block_by_height(block_height)
self.assertNotIn('error', result, result)
self.assertIn('result', result, result)
return result['result']['header']['hash']

def _get_shard_assignment(self, rpc_node):
result = rpc_node.json_rpc('validators', 'latest')
self.assertNotIn('error', result, result)
self.assertIn('result', result, result)
validators = result['result']['current_validators']
shard_assigment = {}
for validator in validators:
shard_assigment[validator['account_id']] = validator['shards']
return shard_assigment

def _has_chunk(self, block_hash, shard_id, node):
result = node.json_rpc("chunk", {
"block_id": block_hash,
"shard_id": shard_id
})
if 'error' in result:
return False
self.assertIn('result', result, result)
return True

def test_shadow_tracking(self):
node_config_dump, node_config_sync = state_sync_lib.get_state_sync_configs_pair(
)
node_config_sync["tracked_shards"] = []
node_config_sync["store.load_mem_tries_for_tracked_shards"] = True
configs = {x: node_config_sync for x in range(3)}
configs[3] = node_config_dump

# Set RPC node to shadow track "test0".
configs[2]["tracked_shadow_validator"] = "test0"

nodes = start_cluster(
2, 2, 3, None,
[["epoch_length", EPOCH_LENGTH],
["shuffle_shard_assignment_for_chunk_producers", True],
["block_producer_kickout_threshold", 20],
["chunk_producer_kickout_threshold", 20]], configs)

for node in nodes:
node.stop_checking_store()

# Wait for 1 epoch so that shard shuffling kicks in.
wait_for_blocks(nodes[3], count=EPOCH_LENGTH)
logger.info('## Initial shard assignment: {}'.format(
self._get_shard_assignment(nodes[3])))

# Stop RPC node for 1 epoch, so that it has to state sync to a new shard tracked by "test0".
nodes[2].kill()
wait_for_blocks(nodes[3], count=EPOCH_LENGTH)
nodes[2].start(boot_node=nodes[3])
# Give it some time to catch up.
wait_for_blocks(nodes[3], count=EPOCH_LENGTH // 2)

round = 0
while True:
round += 1
shards = self._get_shard_assignment(nodes[3])
logger.info(f'## Round {round} shard assigment: {shards}')
block_height = self._get_final_block_height(nodes)
block_hash = self._get_block_hash(block_height, nodes[3])
for shard in shards['test0']:
# The RPC node should have chunk from a shard tracked by "test0".
self.assertTrue(self._has_chunk(block_hash, shard, nodes[2]))
if round == 3:
break
wait_for_blocks(nodes[3], count=EPOCH_LENGTH)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions pytest/tests/sanity/validator_switch_key_quick.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_validator_switch_key_quick(self):
# that it will be assigned to when becoming a validator.
config_map = {
2: {
"tracked_shards": [0],
"tracked_shadow_validator": "test0",
"store.load_mem_tries_for_tracked_shards": True,
}
}
Expand All @@ -42,7 +42,7 @@ def test_validator_switch_key_quick(self):
["block_producer_kickout_threshold", 10],
["chunk_producer_kickout_threshold", 10]],
config_map)
wait_for_blocks(old_validator, count=2)
wait_for_blocks(old_validator, count=5)

new_validator.reset_validator_key(other_validator.validator_key)
other_validator.kill()
Expand Down
Loading