Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metric for event cache and other missing metrics #4863

Merged
merged 8 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions massa-async-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ mod tests {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
Expand Down Expand Up @@ -1142,6 +1143,7 @@ mod tests {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
Expand Down Expand Up @@ -1204,6 +1206,7 @@ mod tests {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
Expand Down Expand Up @@ -1261,6 +1264,7 @@ mod tests {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
Expand Down Expand Up @@ -1318,6 +1322,7 @@ mod tests {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Box::new(MassaDB::new(db_config)) as Box<(dyn MassaDBController + 'static)>,
Expand Down Expand Up @@ -1368,6 +1373,7 @@ mod tests {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let db: ShareableMassaDBController = Arc::new(RwLock::new(Box::new(MassaDB::new(
db_config.clone(),
Expand Down
3 changes: 3 additions & 0 deletions massa-bootstrap/src/tests/binders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ fn test_staying_connected_without_message_trigger_read_timeout() {
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
Expand Down Expand Up @@ -549,6 +550,7 @@ fn test_staying_connected_pass_handshake_but_deadline_after() {
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
Expand Down Expand Up @@ -647,6 +649,7 @@ fn test_staying_connected_pass_handshake_but_deadline_during_data_exchange() {
max_versioning_elements_size: 100_000_000,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
}))
as Box<(dyn MassaDBController + 'static)>));
let rolls_path = PathBuf::from_str("../massa-node/base_config/initial_rolls.json").unwrap();
Expand Down
1 change: 1 addition & 0 deletions massa-bootstrap/src/tests/universe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl TestUniverse for BootstrapClientTestUniverse {
max_final_state_elements_size: MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE as usize,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
}))
as Box<(dyn MassaDBController + 'static)>));
controllers
Expand Down
1 change: 1 addition & 0 deletions massa-bootstrap/src/tests/universe_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl BootstrapServerForeignControllers {
max_final_state_elements_size: MAX_BOOTSTRAP_FINAL_STATE_PARTS_SIZE as usize,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
}))
as Box<(dyn MassaDBController + 'static)>));
Self {
Expand Down
3 changes: 3 additions & 0 deletions massa-db-exports/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub trait MassaDBController: Send + Sync + Debug {
last_change_id: Option<Slot>,
) -> Result<StreamBatch<Slot>, MassaDBError>;

/// Get the total size of the change history and the change versioning history respectively
fn get_change_history_sizes(&self) -> (usize, usize);

/// Used in test to compare a prebuilt ledger with a ledger that has been built by the code
#[cfg(feature = "test-exports")]
fn get_entire_database(&self) -> Vec<BTreeMap<Vec<u8>, Vec<u8>>>;
Expand Down
2 changes: 2 additions & 0 deletions massa-db-exports/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ pub struct MassaDBConfig {
pub thread_count: u8,
/// Maximum number of ledger backups to keep
pub max_ledger_backups: u64,
/// Enable metrics
pub enable_metrics: bool,
}
81 changes: 72 additions & 9 deletions massa-db-worker/src/massa_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct RawMassaDB<
pub change_id_deserializer: ChangeIDDeserializer,
/// The current RocksDB batch of the database, in a Mutex to share it
pub current_batch: Arc<Mutex<WriteBatch>>,
/// If metrics are enabled, we keep track of the size of the changes associated to each change_id
pub change_history_sizes: BTreeMap<ChangeID, (usize, usize)>,
}

impl<ChangeID, ChangeIDSerializer, ChangeIDDeserializer> std::fmt::Debug
Expand Down Expand Up @@ -389,15 +391,51 @@ where
}
}

// in versioning_changes, we have the data that we do not want to include in hash
// e.g everything that is not in 'Active' state (so hashes remain compatibles)
for (key, value) in versioning_changes.iter() {
if let Some(value) = value {
self.current_batch
.lock()
.put_cf(handle_versioning, key, value);
} else {
self.current_batch.lock().delete_cf(handle_versioning, key);
// If metrics are enabled, we keep track of the size of the changes (for state changes, and then for versioning changes)
if self.config.enable_metrics {
let changes_size;
let changes_versioning_size;
{
let mut current_batch_guard = self.current_batch.lock();
changes_size = current_batch_guard.size_in_bytes();

// in versioning_changes, we have the data that we do not want to include in hash
// e.g everything that is not in 'Active' state (so hashes remain compatibles)
for (key, value) in versioning_changes.iter() {
if let Some(value) = value {
current_batch_guard.put_cf(handle_versioning, key, value);
} else {
current_batch_guard.delete_cf(handle_versioning, key);
}
}

changes_versioning_size = current_batch_guard
.size_in_bytes()
.saturating_sub(changes_size);
}

match self
.change_history_sizes
.entry(self.get_change_id().expect(CHANGE_ID_DESER_ERROR))
{
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert((changes_size, changes_versioning_size));
}
std::collections::btree_map::Entry::Occupied(mut entry) => {
entry.get_mut().0 += changes_size;
entry.get_mut().1 += changes_versioning_size;
}
}
} else {
let mut current_batch_guard = self.current_batch.lock();
// in versioning_changes, we have the data that we do not want to include in hash
// e.g everything that is not in 'Active' state (so hashes remain compatibles)
for (key, value) in versioning_changes.iter() {
if let Some(value) = value {
current_batch_guard.put_cf(handle_versioning, key, value);
} else {
current_batch_guard.delete_cf(handle_versioning, key);
}
}
}

Expand Down Expand Up @@ -447,10 +485,12 @@ where
if reset_history {
self.change_history.clear();
self.change_history_versioning.clear();
self.change_history_sizes.clear();
}

while self.change_history.len() > self.config.max_history_length {
self.change_history.pop_first();
self.change_history_sizes.pop_first();
}

while self.change_history_versioning.len() > self.config.max_history_length {
Expand Down Expand Up @@ -614,6 +654,7 @@ impl RawMassaDB<Slot, SlotSerializer, SlotDeserializer> {
config,
change_history: BTreeMap::new(),
change_history_versioning: BTreeMap::new(),
change_history_sizes: BTreeMap::new(),
change_id_serializer: SlotSerializer::new(),
change_id_deserializer,
current_batch,
Expand Down Expand Up @@ -851,6 +892,15 @@ impl MassaDBController for RawMassaDB<Slot, SlotSerializer, SlotDeserializer> {
self.get_versioning_batch_to_stream(last_versioning_step, last_change_id)
}

fn get_change_history_sizes(&self) -> (usize, usize) {
self.change_history_sizes.values().fold(
(0, 0),
|(acc_state, acc_version), &(state, version)| {
(acc_state + state, acc_version + version)
},
)
}

#[cfg(feature = "test-exports")]
fn get_entire_database(&self) -> Vec<BTreeMap<Vec<u8>, Vec<u8>>> {
let handle_state = self.db.cf_handle(STATE_CF).expect(CF_ERROR);
Expand Down Expand Up @@ -933,6 +983,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -963,6 +1014,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1047,6 +1099,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1132,6 +1185,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1181,6 +1235,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_backup_1_opts = MassaDB::default_db_opts();
db_backup_1_opts.create_if_missing(false);
Expand All @@ -1205,6 +1260,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_backup_2_opts = MassaDB::default_db_opts();
db_backup_2_opts.create_if_missing(false);
Expand Down Expand Up @@ -1238,6 +1294,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1285,6 +1342,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
// let db_backup_2_opts = MassaDB::default_db_opts();

Expand Down Expand Up @@ -1334,6 +1392,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1427,6 +1486,7 @@ mod test {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1515,6 +1575,7 @@ mod test {
max_versioning_elements_size: 10,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1600,6 +1661,7 @@ mod test {
max_versioning_elements_size: 20,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};
let mut db_opts = MassaDB::default_db_opts();
// Additional checks (only for testing)
Expand Down Expand Up @@ -1712,6 +1774,7 @@ mod test {
max_versioning_elements_size: 20,
thread_count: THREAD_COUNT,
max_ledger_backups: 10,
enable_metrics: false,
};

let slot_1 = Slot::new(1, 0);
Expand Down
2 changes: 2 additions & 0 deletions massa-deferred-calls/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn call_registry_apply_changes() {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let call_id_serializer = DeferredCallIdSerializer::new();
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Expand Down Expand Up @@ -75,6 +76,7 @@ fn call_registry_get_slot_calls() {
max_versioning_elements_size: 100,
thread_count: THREAD_COUNT,
max_ledger_backups: 100,
enable_metrics: false,
};
let call_id_serializer = DeferredCallIdSerializer::new();
let db: ShareableMassaDBController = Arc::new(RwLock::new(
Expand Down
35 changes: 16 additions & 19 deletions massa-event-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,27 @@ version = "0.1.0"
edition = "2021"

[features]
test-exports = [
"massa_models/test-exports",
"mockall",
"mockall_wrap"
]
test-exports = ["massa_models/test-exports", "mockall", "mockall_wrap"]


[dependencies]
nom = {workspace = true}
rocksdb = {workspace = true}
tracing = {workspace = true}
nom = { workspace = true }
rocksdb = { workspace = true }
tracing = { workspace = true }
parking_lot = { workspace = true }
num_enum = { workspace = true }
massa_models = {workspace = true}
massa_serialization = {workspace = true}
massa_time = {workspace = true}
mockall = {workspace = true, optional = true}
mockall_wrap = {workspace = true, optional = true}
massa_metrics = { workspace = true }
massa_models = { workspace = true }
massa_serialization = { workspace = true }
massa_time = { workspace = true }
mockall = { workspace = true, optional = true }
mockall_wrap = { workspace = true, optional = true }

[dev-dependencies]
tempfile = {workspace = true}
serial_test = {workspace = true}
more-asserts = {workspace = true}
rand = {workspace = true}
mockall = {workspace = true}
mockall_wrap = {workspace = true}
tempfile = { workspace = true }
serial_test = { workspace = true }
more-asserts = { workspace = true }
rand = { workspace = true }
mockall = { workspace = true }
mockall_wrap = { workspace = true }
massa_models = { workspace = true, features = ["test-exports"] }
1 change: 1 addition & 0 deletions massa-event-cache/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl EventCacheController for EventCacheControllerImpl {
// lock input data
let mut input_data = self.input_data.1.lock();
input_data.events.extend(events);
massa_metrics::set_event_cache_vec_len(input_data.events.len());
// Wake up the condvar in EventCacheWriterThread waiting for events
self.input_data.0.notify_all();
}
Expand Down
Loading
Loading