Skip to content

Commit

Permalink
Expose workers handles (MystenLabs#428)
Browse files Browse the repository at this point in the history
* Expose primary handles

* Expose workers handles
  • Loading branch information
asonnino authored Jul 2, 2022
1 parent af56e79 commit d043fbd
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 73 deletions.
11 changes: 6 additions & 5 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Node {
tx_confirmation: Sender<(SubscriberResult<Vec<u8>>, SerializedTransaction)>,
// A prometheus exporter Registry to use for the metrics
registry: &Registry,
) -> SubscriberResult<JoinHandle<()>>
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
PublicKey: VerifyingKey,
Keys: KeyPair<PubKey = PublicKey> + Signer<PublicKey::Sig> + Send + 'static,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl Node {
};

// Spawn the primary.
let primary_handle = Primary::spawn(
let primary_handles = Primary::spawn(
name.clone(),
keypair,
committee.clone(),
Expand All @@ -161,7 +161,7 @@ impl Node {
registry,
);

Ok(primary_handle)
Ok(primary_handles)
}

/// Spawn the consensus core and the client executing transactions.
Expand Down Expand Up @@ -243,13 +243,14 @@ impl Node {
let mut handles = Vec::new();

for id in ids {
handles.push(Worker::spawn(
let worker_handles = Worker::spawn(
name.clone(),
id,
committee.clone(),
parameters.clone(),
store.batch_store.clone(),
));
);
handles.extend(worker_handles);
}
handles
}
Expand Down
5 changes: 2 additions & 3 deletions narwhal/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> {
("primary", Some(sub_matches)) => {
registry = primary_metrics_registry(keypair.public().clone());

let handle = Node::spawn_primary(
Node::spawn_primary(
keypair,
committee,
&store,
Expand All @@ -163,8 +163,7 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> {
tx_transaction_confirmation,
&registry,
)
.await?;
vec![handle]
.await?
}

// Spawn a single worker.
Expand Down
5 changes: 3 additions & 2 deletions narwhal/primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio::{
mpsc::{channel, Receiver, Sender},
watch,
},
task::JoinHandle,
time::{sleep, timeout},
};
use tracing::{debug, error, instrument, trace, warn};
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<PublicKey: VerifyingKey> BlockSynchronizer<PublicKey> {
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
certificate_store: Store<CertificateDigest, Certificate<PublicKey>>,
parameters: BlockSynchronizerParameters,
) {
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
name,
Expand All @@ -228,7 +229,7 @@ impl<PublicKey: VerifyingKey> BlockSynchronizer<PublicKey> {
}
.run()
.await;
});
})
}

pub async fn run(&mut self) {
Expand Down
5 changes: 3 additions & 2 deletions narwhal/primary/src/block_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
};
use tokio::{
sync::{mpsc::Receiver, oneshot, watch},
task::JoinHandle,
time::timeout,
};
use tracing::{debug, error, instrument, warn};
Expand Down Expand Up @@ -257,7 +258,7 @@ impl<PublicKey: VerifyingKey, SynchronizerHandler: Handler<PublicKey> + Send + S
rx_commands: Receiver<BlockCommand>,
batch_receiver: Receiver<BatchResult>,
block_synchronizer_handler: Arc<SynchronizerHandler>,
) {
) -> JoinHandle<()> {
tokio::spawn(async move {
let shutdown_token = Self {
name,
Expand All @@ -275,7 +276,7 @@ impl<PublicKey: VerifyingKey, SynchronizerHandler: Handler<PublicKey> + Send + S
.run()
.await;
drop(shutdown_token);
});
})
}

async fn run(&mut self) -> ShutdownToken {
Expand Down
13 changes: 8 additions & 5 deletions narwhal/primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::{
},
};
use store::Store;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
watch,
use tokio::{
sync::{
mpsc::{channel, Receiver, Sender},
watch,
},
task::JoinHandle,
};
use tracing::error;
use types::{
Expand Down Expand Up @@ -58,7 +61,7 @@ impl<PublicKey: VerifyingKey> CertificateWaiter<PublicKey> {
rx_reconfigure: watch::Receiver<Reconfigure<PublicKey>>,
rx_synchronizer: Receiver<Certificate<PublicKey>>,
tx_core: Sender<Certificate<PublicKey>>,
) {
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
committee,
Expand All @@ -72,7 +75,7 @@ impl<PublicKey: VerifyingKey> CertificateWaiter<PublicKey> {
}
.run()
.await;
});
})
}

/// Helper function. It waits for particular data to become available in the storage and then
Expand Down
5 changes: 3 additions & 2 deletions narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::{
mpsc::{channel, Receiver, Sender},
watch,
},
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{debug, error};
Expand Down Expand Up @@ -100,7 +101,7 @@ impl<PublicKey: VerifyingKey> HeaderWaiter<PublicKey> {
rx_reconfigure: watch::Receiver<Reconfigure<PublicKey>>,
rx_synchronizer: Receiver<WaiterMessage<PublicKey>>,
tx_core: Sender<Header<PublicKey>>,
) {
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
name,
Expand All @@ -122,7 +123,7 @@ impl<PublicKey: VerifyingKey> HeaderWaiter<PublicKey> {
}
.run()
.await;
});
})
}

/// Update the committee and cleanup internal state.
Expand Down
9 changes: 6 additions & 3 deletions narwhal/primary/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use crypto::traits::{EncodeDecodeBase64, VerifyingKey};
use network::PrimaryNetwork;
use store::{Store, StoreError};
use thiserror::Error;
use tokio::sync::{mpsc::Receiver, watch};
use tokio::{
sync::{mpsc::Receiver, watch},
task::JoinHandle,
};
use tracing::{error, instrument};
use types::{BatchDigest, Certificate, CertificateDigest, ShutdownToken};

Expand Down Expand Up @@ -57,7 +60,7 @@ impl<PublicKey: VerifyingKey> Helper<PublicKey> {
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
rx_committee: watch::Receiver<Reconfigure<PublicKey>>,
rx_primaries: Receiver<PrimaryMessage<PublicKey>>,
) {
) -> JoinHandle<()> {
tokio::spawn(async move {
let shutdown_token = Self {
name,
Expand All @@ -71,7 +74,7 @@ impl<PublicKey: VerifyingKey> Helper<PublicKey> {
.run()
.await;
drop(shutdown_token);
});
})
}

async fn run(&mut self) -> ShutdownToken {
Expand Down
10 changes: 4 additions & 6 deletions narwhal/primary/src/payload_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::primary::PayloadToken;
use config::WorkerId;

use store::Store;
use tokio::sync::mpsc::Receiver;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use types::BatchDigest;

use crate::primary::PayloadToken;

/// Receives batches' digests of other authorities. These are only needed to verify incoming
/// headers (i.e.. make sure we have their payload).
pub struct PayloadReceiver {
Expand All @@ -22,10 +20,10 @@ impl PayloadReceiver {
pub fn spawn(
store: Store<(BatchDigest, WorkerId), PayloadToken>,
rx_workers: Receiver<(BatchDigest, WorkerId)>,
) {
) -> JoinHandle<()> {
tokio::spawn(async move {
Self { store, rx_workers }.run().await;
});
})
}

async fn run(&mut self) {
Expand Down
35 changes: 23 additions & 12 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl Primary {
network_model: NetworkModel,
tx_committed_certificates: Sender<ConsensusPrimaryMessage<PublicKey>>,
registry: &Registry,
) -> JoinHandle<()> {
) -> Vec<JoinHandle<()>> {
let initial_committee = Reconfigure::NewCommittee((&*committee).clone());
let (tx_reconfigure, rx_reconfigure) = watch::channel(initial_committee);

Expand Down Expand Up @@ -208,7 +208,7 @@ impl Primary {
let signature_service = SignatureService::new(signer);

// The `Core` receives and handles headers, votes, and certificates from the other primaries.
let primary_handle = Core::spawn(
let core_handle = Core::spawn(
name.clone(),
(&*committee).clone(),
header_store.clone(),
Expand All @@ -228,7 +228,7 @@ impl Primary {
);

// Receives batch digests from other workers. They are only used to validate headers.
PayloadReceiver::spawn(
let payload_receiver_handle = PayloadReceiver::spawn(
payload_store.clone(),
/* rx_workers */ rx_others_digests,
);
Expand All @@ -244,7 +244,7 @@ impl Primary {

// Retrieves a block's data by contacting the worker nodes that contain the
// underlying batches and their transactions.
BlockWaiter::spawn(
let block_waiter_handle = BlockWaiter::spawn(
name.clone(),
(&*committee).clone(),
tx_reconfigure.subscribe(),
Expand All @@ -257,7 +257,7 @@ impl Primary {
let internal_consensus = dag.is_none();

// Orchestrates the removal of blocks across the primary and worker nodes.
BlockRemover::spawn(
let block_remover_handle = BlockRemover::spawn(
name.clone(),
(&*committee).clone(),
certificate_store.clone(),
Expand All @@ -273,7 +273,7 @@ impl Primary {

// Responsible for finding missing blocks (certificates) and fetching
// them from the primary peers by synchronizing also their batches.
BlockSynchronizer::spawn(
let block_synchronizer_handle = BlockSynchronizer::spawn(
name.clone(),
(&*committee).clone(),
tx_reconfigure.subscribe(),
Expand All @@ -289,7 +289,7 @@ impl Primary {
// Whenever the `Synchronizer` does not manage to validate a header due to missing parent certificates of
// batch digests, it commands the `HeaderWaiter` to synchronize with other nodes, wait for their reply, and
// re-schedule execution of the header once we have all missing data.
HeaderWaiter::spawn(
let header_waiter_handle = HeaderWaiter::spawn(
name.clone(),
(&*committee).clone(),
certificate_store.clone(),
Expand All @@ -305,7 +305,7 @@ impl Primary {

// The `CertificateWaiter` waits to receive all the ancestors of a certificate before looping it back to the
// `Core` for further processing.
CertificateWaiter::spawn(
let certificate_waiter_handle = CertificateWaiter::spawn(
(&*committee).clone(),
certificate_store.clone(),
consensus_round.clone(),
Expand All @@ -317,7 +317,7 @@ impl Primary {

// When the `Core` collects enough parent certificates, the `Proposer` generates a new header with new batch
// digests from our workers and sends it back to the `Core`.
Proposer::spawn(
let proposer_handle = Proposer::spawn(
name.clone(),
(&*committee).clone(),
signature_service,
Expand All @@ -332,7 +332,7 @@ impl Primary {

// The `Helper` is dedicated to reply to certificates & payload availability requests
// from other primaries.
Helper::spawn(
let helper_handle = Helper::spawn(
name.clone(),
(&*committee).clone(),
certificate_store,
Expand All @@ -357,7 +357,7 @@ impl Primary {
}

// Keeps track of the latest consensus round and allows other tasks to clean up their their internal state
StateHandler::spawn(
let state_handler_handle = StateHandler::spawn(
name.clone(),
committee.clone(),
consensus_round,
Expand All @@ -375,7 +375,18 @@ impl Primary {
.primary_to_primary
);

primary_handle
vec![
core_handle,
payload_receiver_handle,
block_synchronizer_handle,
block_waiter_handle,
block_remover_handle,
header_waiter_handle,
certificate_waiter_handle,
proposer_handle,
helper_handle,
state_handler_handle,
]
}
}

Expand Down
5 changes: 3 additions & 2 deletions narwhal/primary/src/proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::{
mpsc::{Receiver, Sender},
watch,
},
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::debug;
Expand Down Expand Up @@ -71,7 +72,7 @@ impl<PublicKey: VerifyingKey> Proposer<PublicKey> {
rx_core: Receiver<(Vec<Certificate<PublicKey>>, Round, Epoch)>,
rx_workers: Receiver<(BatchDigest, WorkerId)>,
tx_core: Sender<Header<PublicKey>>,
) {
) -> JoinHandle<()> {
let genesis = Certificate::genesis(&committee);
tokio::spawn(async move {
Self {
Expand All @@ -93,7 +94,7 @@ impl<PublicKey: VerifyingKey> Proposer<PublicKey> {
}
.run()
.await;
});
})
}

async fn make_header(&mut self) -> DagResult<()> {
Expand Down
Loading

0 comments on commit d043fbd

Please sign in to comment.