Skip to content

Commit

Permalink
link the remove_collections endpoint to the garbage collections proce…
Browse files Browse the repository at this point in the history
…ss (#378)
  • Loading branch information
akichidis authored Jun 27, 2022
1 parent 41b83b2 commit 9f5c6c3
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 18 deletions.
7 changes: 4 additions & 3 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl Node {
State: ExecutionState + Send + Sync + 'static,
{
let (tx_new_certificates, rx_new_certificates) = channel(Self::CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(Self::CHANNEL_CAPACITY);
let (tx_consensus, rx_consensus) = channel(Self::CHANNEL_CAPACITY);

// Compute the public key of this authority.
let name = keypair.public().clone();
Expand All @@ -132,7 +132,7 @@ impl Node {
parameters.clone(),
execution_state,
rx_new_certificates,
tx_feedback,
tx_consensus.clone(),
tx_confirmation,
)
.await?;
Expand All @@ -149,9 +149,10 @@ impl Node {
store.certificate_store.clone(),
store.payload_store.clone(),
/* tx_consensus */ tx_new_certificates,
/* rx_consensus */ rx_feedback,
/* rx_consensus */ rx_consensus,
/* dag */ dag,
network_model,
tx_consensus,
);

Ok(primary_handle)
Expand Down
16 changes: 16 additions & 0 deletions narwhal/primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub struct DeleteBatchMessage {
///
/// let (tx_commands, rx_commands) = channel(1);
/// let (tx_delete_batches, rx_delete_batches) = channel(1);
/// let (tx_removed_certificates, _rx_removed_certificates) = channel(1);
/// let (tx_delete_block_result, mut rx_delete_block_result) = channel(1);
///
/// let name = Ed25519PublicKey::default();
Expand All @@ -142,6 +143,7 @@ pub struct DeleteBatchMessage {
/// PrimaryToWorkerNetwork::default(),
/// rx_commands,
/// rx_delete_batches,
/// tx_removed_certificates,
/// );
///
/// // A dummy certificate
Expand Down Expand Up @@ -211,6 +213,9 @@ pub struct BlockRemover<PublicKey: VerifyingKey> {
// TODO: Change to a oneshot channel instead of an mpsc channel
/// Receives all the responses to the requests to delete a batch.
rx_delete_batches: Receiver<DeleteBatchResult>,

/// Outputs all the successfully deleted certificates
tx_removed_certificates: Sender<Certificate<PublicKey>>,
}

impl<PublicKey: VerifyingKey> BlockRemover<PublicKey> {
Expand All @@ -224,6 +229,7 @@ impl<PublicKey: VerifyingKey> BlockRemover<PublicKey> {
worker_network: PrimaryToWorkerNetwork,
rx_commands: Receiver<BlockRemoverCommand>,
rx_delete_batches: Receiver<DeleteBatchResult>,
removed_certificates: Sender<Certificate<PublicKey>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
Expand All @@ -239,6 +245,7 @@ impl<PublicKey: VerifyingKey> BlockRemover<PublicKey> {
map_tx_removal_results: HashMap::new(),
map_tx_worker_removal_results: HashMap::new(),
rx_delete_batches,
tx_removed_certificates: removed_certificates,
}
.run()
.await;
Expand Down Expand Up @@ -375,11 +382,20 @@ impl<PublicKey: VerifyingKey> BlockRemover<PublicKey> {
if let Some(dag) = &self.dag {
dag.remove(&certificate_ids).await.map_err(Either::Right)?
}

self.certificate_store
.remove_all(certificate_ids)
.await
.map_err(Either::Left)?;

// Now output all the removed certificates
for certificate in certificates.clone() {
self.tx_removed_certificates
.send(certificate)
.await
.expect("Couldn't forward removed certificates to channel");
}

debug!("Successfully cleaned up certificates: {:?}", certificates);

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl Primary {
rx_consensus: Receiver<Certificate<PublicKey>>,
dag: Option<Arc<Dag<PublicKey>>>,
network_model: NetworkModel,
tx_committed_certificates: Sender<Certificate<PublicKey>>,
) -> JoinHandle<()> {
let (tx_others_digests, rx_others_digests) = channel(CHANNEL_CAPACITY);
let (tx_our_digests, rx_our_digests) = channel(CHANNEL_CAPACITY);
Expand Down Expand Up @@ -244,6 +245,7 @@ impl Primary {
PrimaryToWorkerNetwork::default(),
rx_block_removal_commands,
rx_batch_removal,
tx_committed_certificates,
);

// Responsible for finding missing blocks (certificates) and fetching
Expand Down
20 changes: 19 additions & 1 deletion narwhal/primary/src/tests/block_remover_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async fn test_successful_blocks_delete() {
// GIVEN
let (header_store, certificate_store, payload_store) = create_db_stores();
let (_tx_consensus, rx_consensus) = channel(1);
let (tx_removed_certificates, mut rx_removed_certificates) = channel(10);
let (tx_commands, rx_commands) = channel(10);
let (tx_remove_block, mut rx_remove_block) = channel(1);
let (tx_delete_batches, rx_delete_batches) = channel(10);
Expand All @@ -54,6 +55,7 @@ async fn test_successful_blocks_delete() {
PrimaryToWorkerNetwork::default(),
rx_commands,
rx_delete_batches,
tx_removed_certificates,
);

let mut block_ids = Vec::new();
Expand Down Expand Up @@ -157,7 +159,7 @@ async fn test_successful_blocks_delete() {
assert_eq!(block.ids.len(), block_ids.len());

// ensure that certificates have been deleted from store
for block_id in block_ids {
for block_id in block_ids.clone() {
assert!(certificate_store.read(block_id).await.unwrap().is_none(), "Certificate shouldn't exist");
}

Expand All @@ -177,6 +179,18 @@ async fn test_successful_blocks_delete() {
panic!("Timeout, no result has been received in time")
}
}

// ensure deleted certificates have been populated to output channel
let mut total_deleted = 0;
while let Ok(Some(c)) = timeout(Duration::from_secs(1), rx_removed_certificates.recv()).await {
assert!(
block_ids.contains(&c.digest()),
"Deleted certificate not found"
);
total_deleted += 1;
}

assert_eq!(total_deleted, block_ids.len());
}

#[tokio::test]
Expand All @@ -187,6 +201,7 @@ async fn test_timeout() {
let (tx_remove_block, mut rx_remove_block) = channel(1);
let (_tx_consensus, rx_consensus) = channel(1);
let (tx_delete_batches, rx_delete_batches) = channel(10);
let (tx_removed_certificates, _rx_removed_certificates) = channel(10);

// AND the necessary keys
let (name, committee) = resolve_name_and_committee();
Expand All @@ -204,6 +219,7 @@ async fn test_timeout() {
PrimaryToWorkerNetwork::default(),
rx_commands,
rx_delete_batches,
tx_removed_certificates,
);

let mut block_ids = Vec::new();
Expand Down Expand Up @@ -318,6 +334,7 @@ async fn test_unlocking_pending_requests() {
let (tx_commands, rx_commands) = channel(10);
let (_tx_consensus, rx_consensus) = channel(1);
let (tx_delete_batches, rx_delete_batches) = channel(10);
let (tx_removed_certificates, _rx_removed_certificates) = channel(10);

// AND the necessary keys
let (name, committee) = resolve_name_and_committee();
Expand All @@ -338,6 +355,7 @@ async fn test_unlocking_pending_requests() {
map_tx_removal_results: HashMap::new(),
map_tx_worker_removal_results: HashMap::new(),
rx_delete_batches,
tx_removed_certificates,
};

let mut block_ids = Vec::new();
Expand Down
6 changes: 4 additions & 2 deletions narwhal/primary/tests/integration_tests_configuration_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn test_new_epoch() {
let store = NodeStorage::reopen(temp_dir());

let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY);
let (_tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);

Primary::spawn(
name.clone(),
Expand All @@ -44,6 +44,7 @@ async fn test_new_epoch() {
/* rx_consensus */ rx_feedback,
/* dag */ Some(Arc::new(Dag::new(&committee, rx_new_certificates).1)),
NetworkModel::Asynchronous,
tx_feedback,
);

// Wait for tasks to start
Expand Down Expand Up @@ -90,7 +91,7 @@ async fn test_new_network_info() {
let store = NodeStorage::reopen(temp_dir());

let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY);
let (_tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);

Primary::spawn(
name.clone(),
Expand All @@ -104,6 +105,7 @@ async fn test_new_network_info() {
/* rx_consensus */ rx_feedback,
/* dag */ Some(Arc::new(Dag::new(&committee, rx_new_certificates).1)),
NetworkModel::Asynchronous,
/* tx_committed_certificates */ tx_feedback,
);

// Wait for tasks to start
Expand Down
12 changes: 8 additions & 4 deletions narwhal/primary/tests/integration_tests_proposer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn test_rounds_errors() {

// Spawn the primary
let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY);
let (_tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);

// AND create a committee passed exclusively to the DAG that does not include the name public key
// In this way, the genesis certificate is not run for that authority and is absent when we try to fetch it
Expand Down Expand Up @@ -108,6 +108,7 @@ async fn test_rounds_errors() {
Dag::new(&no_name_committee, rx_new_certificates).1,
)),
NetworkModel::Asynchronous,
tx_feedback,
);

// AND Wait for tasks to start
Expand Down Expand Up @@ -156,7 +157,7 @@ async fn test_rounds_return_successful_response() {

// Spawn the primary
let (tx_new_certificates, rx_new_certificates) = channel(CHANNEL_CAPACITY);
let (_tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);

// AND setup the DAG
let dag = Arc::new(Dag::new(&committee, rx_new_certificates).1);
Expand All @@ -173,6 +174,7 @@ async fn test_rounds_return_successful_response() {
/* rx_consensus */ rx_feedback,
/* external_consensus */ Some(dag.clone()),
NetworkModel::Asynchronous,
tx_feedback,
);

// AND Wait for tasks to start
Expand Down Expand Up @@ -286,7 +288,7 @@ async fn test_node_read_causal_signed_certificates() {
.await
.unwrap();

let (_tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);
let (tx_feedback, rx_feedback) = channel(CHANNEL_CAPACITY);

let primary_1_parameters = Parameters {
batch_size: 200, // Two transactions.
Expand All @@ -308,10 +310,11 @@ async fn test_node_read_causal_signed_certificates() {
/* rx_consensus */ rx_feedback,
/* dag */ Some(dag.clone()),
NetworkModel::Asynchronous,
tx_feedback,
);

let (tx_new_certificates_2, rx_new_certificates_2) = channel(CHANNEL_CAPACITY);
let (_tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY);
let (tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY);

let primary_2_parameters = Parameters {
batch_size: 200, // Two transactions.
Expand All @@ -334,6 +337,7 @@ async fn test_node_read_causal_signed_certificates() {
/* external_consensus */
Some(Arc::new(Dag::new(&committee, rx_new_certificates_2).1)),
NetworkModel::Asynchronous,
tx_feedback_2,
);

// Wait for tasks to start
Expand Down
Loading

0 comments on commit 9f5c6c3

Please sign in to comment.