Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fastx client] Add logic for a client to sync a cert to an authority, and two authorities. #285

Merged
merged 8 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ impl AuthorityState {
);

let object = object.ok_or(FastPayError::ObjectNotFound { object_id })?;
fp_ensure!(
object.digest() == object_digest,
FastPayError::InvalidObjectDigest {
object_id,
expected_digest: object_digest
}
);

// Check that the seq number is the same
fp_ensure!(
Expand All @@ -111,6 +104,15 @@ impl AuthorityState {
}
);

// Check the digest matches
fp_ensure!(
object.digest() == object_digest,
FastPayError::InvalidObjectDigest {
object_id,
expected_digest: object_digest
}
);

if object.is_read_only() {
// For a tranfer order, the object to be transferred
// must not be read only.
Expand Down Expand Up @@ -171,6 +173,12 @@ impl AuthorityState {
// Check the certificate and retrieve the transfer data.
certificate.check(&self.committee)?;

// Ensure an idempotent answer
let order_info = self.make_order_info(&transaction_digest).await?;
if order_info.certified_order.is_some() {
return Ok(order_info);
}

let input_objects = order.input_objects();
let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect();
// Get a copy of the object.
Expand All @@ -197,6 +205,10 @@ impl AuthorityState {
current_sequence_number: input_sequence_number
});
}

// Note: this should never be true in prod, but some tests
// (test_handle_confirmation_order_bad_sequence_number) do
// a poor job of setting up the DB.
Comment on lines +209 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we open an issue for that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok we can just kill the broken test.

if input_sequence_number > input_seq {
// Transfer was already confirmed.
return self.make_order_info(&transaction_digest).await;
Expand All @@ -210,7 +222,6 @@ impl AuthorityState {
}

// Insert into the certificates map
let transaction_digest = certificate.order.digest();
let mut tx_ctx = TxContext::new(order.sender(), transaction_digest);

let gas_object_id = *order.gas_payment_object_id();
Expand Down Expand Up @@ -321,11 +332,15 @@ impl AuthorityState {
let requested_certificate = if let Some(seq) = request.request_sequence_number {
// Get the Transaction Digest that created the object
let parent_iterator = self
.get_parent_iterator(request.object_id, Some(seq.increment()))
.get_parent_iterator(request.object_id, Some(seq))
.await?;
let (_, transaction_digest) = parent_iterator
.first()
.ok_or(FastPayError::CertificateNotfound)?;
let (_, transaction_digest) =
parent_iterator
.first()
.ok_or(FastPayError::ParentNotfound {
object_id: request.object_id,
sequence: seq,
})?;
// Get the cert from the transaction digest
Some(
self.read_certificate(transaction_digest)
Expand Down
212 changes: 209 additions & 3 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use move_core_types::language_storage::TypeTag;
use rand::seq::SliceRandom;
use typed_store::rocks::open_cf;

use std::collections::HashSet;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::env;
use std::path::Path;
Expand Down Expand Up @@ -266,6 +267,7 @@ impl<A> ClientState<A> {
}
}

#[allow(dead_code)]
#[derive(Clone)]
struct CertificateRequester<A> {
committee: Committee,
Expand Down Expand Up @@ -300,9 +302,18 @@ where
&mut self,
(object_id, sequence_number): (ObjectID, SequenceNumber),
) -> Result<CertifiedOrder, FastPayError> {
// BUG(https://github.com/MystenLabs/fastnft/issues/290): This function assumes that requesting the parent cert of object seq+1 will give the cert of
// that creates the object. This is not true, as objects may be deleted and may not have a seq+1
// to look up.
//
// The authority `handle_object_info_request` is now fixed to return the parent at seq, and not
// seq+1. But a lot of the client code makes the above wrong assumption, and the line above reverts
// query to the old (incorrect) behavious to not break tests everywhere.
let inner_sequence_number = sequence_number.increment();

let request = ObjectInfoRequest {
object_id,
request_sequence_number: Some(sequence_number),
request_sequence_number: Some(inner_sequence_number),
request_received_transfers_excluding_first_nth: None,
};
// Sequentially try each authority in random order.
Expand All @@ -315,14 +326,19 @@ where
.requested_certificate
.expect("Unable to get certificate");
if certificate.check(&self.committee).is_ok() {
// BUG (https://github.com/MystenLabs/fastnft/issues/290): Orders do not have a sequence number any more, objects do.
/*
let order = &certificate.order;
if let Some(sender) = self.sender {
if order.sender() == &sender && order.sequence_number() == sequence_number {

if order.sender() == &sender && order.sequence_number() == inner_sequence_number {
return Ok(certificate.clone());
}
} else {
return Ok(certificate.clone());
}
*/
return Ok(certificate);
}
}
}
Expand All @@ -334,6 +350,183 @@ impl<A> ClientState<A>
where
A: AuthorityClient + Send + Sync + 'static + Clone,
{
/// Sync a certificate and all its dependencies to a destination authority, using a
/// source authority to get information about parent certificates.
///
/// Note: Both source and destination may be byzantine, therefore one should always
/// time limit the call to this function to avoid byzantine authorities consuming
/// an unbounded amount of resources.
async fn sync_authority_source_to_destination(
&self,
cert: ConfirmationOrder,
source_authority: AuthorityName,
destination_authority: AuthorityName,
) -> Result<(), FastPayError> {
let source_client = self.authority_clients[&source_authority].clone();
let mut destination_client = self.authority_clients[&destination_authority].clone();

// This represents a stack of certificates that we need to register with the
// destination authority. The stack is a LIFO queue, and therefore later insertions
// represent certificates that earlier insertions depend on. Thus updating an
// authority in the order we pop() the certificates from this stack should ensure
// certificates are uploaded in causal order.
let digest = cert.certificate.order.digest();
let mut missing_certificates: Vec<_> = vec![cert.clone()];

// We keep a list of certificates already processed to avoid duplicates
let mut candidate_certificates: HashSet<TransactionDigest> =
vec![digest].into_iter().collect();
let mut attempted_certificates: HashSet<TransactionDigest> = HashSet::new();

while let Some(target_cert) = missing_certificates.pop() {
match destination_client
.handle_confirmation_order(target_cert.clone())
.await
{
Ok(_) => continue,
Err(FastPayError::ObjectNotFound { .. })
| Err(FastPayError::MissingEalierConfirmations { .. }) => {}
Err(e) => return Err(e),
}

// If we are here it means that the destination authority is missing
// the previous certificates, so we need to read them from the source
// authority.

// The first time we cannot find the cert from the destination authority
// we try to get its dependencies. But the second time we have already tried
// to update its dependencies, so we should just admit failure.
let cert_digest = target_cert.certificate.order.digest();
if attempted_certificates.contains(&cert_digest) {
return Err(FastPayError::AuthorityInformationUnavailable);
}
attempted_certificates.insert(cert_digest);

// TODO: Eventually the client will store more information, and we could
// first try to read certificates and parents from a local cache before
// asking an authority.
let input_objects = target_cert.certificate.order.input_objects();

// Put back the target cert
missing_certificates.push(target_cert);

for object_ref in input_objects {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to require every parent certificate from the authority, or could it be that we're just missing one or two? Could we find out by inspecting FastPayError::MissingEarlierConfirmations?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, sadly, the authority does not provide us in one shot with all missing objects. Probably we should re-factor to do that (the information is there and already read from the DB).

// Request the parent certificate from the authority.
let object_info_response = source_client
.handle_object_info_request(ObjectInfoRequest {
object_id: object_ref.0,
request_sequence_number: Some(object_ref.1),
request_received_transfers_excluding_first_nth: None,
})
.await;

let object_info = match object_info_response {
Ok(object_info) => object_info,
// Here we cover the case the object genuinely has no parent.
Err(FastPayError::ParentNotfound { .. }) => {
continue;
}
Err(e) => return Err(e),
};

let returned_certificate = object_info
.requested_certificate
.ok_or(FastPayError::AuthorityInformationUnavailable)?;
let returned_digest = returned_certificate.order.digest();

// We check that we are not processing twice the same certificate, as
// it would be common if two objects used by one order, were also both
// mutated by the same preceeding order.
if !candidate_certificates.contains(&returned_digest) {
// Add this cert to the set we have processed
candidate_certificates.insert(returned_digest);

// Check & Add it to the list of certificates to sync
returned_certificate.check(&self.committee).map_err(|_| {
FastPayError::ByzantineAuthoritySuspicion {
authority: source_authority,
}
})?;
missing_certificates.push(ConfirmationOrder::new(returned_certificate));
}
}
}

Ok(())
}

/// Sync a certificate to an authority.
///
/// This function infers which authorities have the history related to
/// a certificate and attempts `retries` number of them, sampled accoding to
/// stake, in order to bring the destination authority up to date to accept
/// the certificate. The time devoted to each attempt is bounded by
/// `timeout_milliseconds`.
pub async fn sync_certificate_to_authority_with_timeout(
&self,
cert: ConfirmationOrder,
destination_authority: AuthorityName,
timeout_milliseconds: u64,
retries: usize,
) -> Result<(), FastPayError> {
// Extract the set of authorities that should have this certificate
// and its full history. We should be able to use these are source authorities.
let mut candidate_source_authorties: HashSet<AuthorityName> = cert
.certificate
.signatures
.iter()
.map(|(name, _)| *name)
.collect();

// Sample a `retries` number of distinct authorities by stake.
let mut source_authorities: Vec<AuthorityName> = Vec::new();
while source_authorities.len() < retries && !candidate_source_authorties.is_empty() {
// Here we do rejection sampling.
//
// TODO: add a filter parameter to sample, so that we can directly
// sample from a subset which is more efficient.
let sample_authority = self.committee.sample();
if candidate_source_authorties.contains(sample_authority) {
candidate_source_authorties.remove(sample_authority);
source_authorities.push(*sample_authority);
}
}

// Now try to update the destination authority sequentially using
// the source authorities we have sampled.
for source_authority in source_authorities {
// Note: here we could improve this function by passing into the
// `sync_authority_source_to_destination` call a cache of
// certificates and parents to avoid re-downloading them.
if timeout(
Duration::from_millis(timeout_milliseconds),
self.sync_authority_source_to_destination(
cert.clone(),
source_authority,
destination_authority,
),
)
.await
.is_ok()
{
// If the updates suceeds we return, since there is no need
// to try other sources.
return Ok(());
}

// If we are here it means that the update failed, either due to the
// source being faulty or the destination being faulty.
//
// TODO: We should probably be keeping a record of suspected faults
// upon failure to de-prioritize authorities that we have observed being
// less reliable.
}

// Eventually we should add more information to this error about the destination
// and maybe event the certificiate.
Err(FastPayError::AuthorityUpdateFailure)
}

#[cfg(test)]
async fn request_certificate(
&mut self,
Expand Down Expand Up @@ -530,6 +723,10 @@ where
}

/// Broadcast missing confirmation orders and execute provided authority action on each authority.
// BUG(https://github.com/MystenLabs/fastnft/issues/290): This logic for
// updating an authority that is behind is not correct, since we now have
// potentially many dependencies that need to be satisfied, not just a
// list.
async fn broadcast_and_execute<'a, V, F: 'a>(
&'a mut self,
sender: FastPayAddress,
Expand Down Expand Up @@ -591,6 +788,7 @@ where
number = seq.decrement();
}
}

// Send all missing confirmation orders.
missing_certificates.reverse();
missing_certificates.extend(certificates_to_broadcast.clone());
Expand Down Expand Up @@ -938,7 +1136,15 @@ where
let response = self
.get_object_info(ObjectInfoRequest {
object_id: *certificate.order.object_id(),
request_sequence_number: Some(transfer.object_ref.1),
// BUG(https://github.com/MystenLabs/fastnft/issues/290):
// This function assumes that requesting the parent cert of object seq+1 will give the cert of
// that creates the object. This is not true, as objects may be deleted and may not have a seq+1
// to look up.
//
// The authority `handle_object_info_request` is now fixed to return the parent at seq, and not
// seq+1. But a lot of the client code makes the above wrong assumption, and the line above reverts
// query to the old (incorrect) behavious to not break tests everywhere.
request_sequence_number: Some(transfer.object_ref.1.increment()),
request_received_transfers_excluding_first_nth: None,
})
.await?;
Expand Down
Loading