Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored sequence_number and fixed bidirectional object transfer #50

Merged
merged 10 commits into from
Dec 21, 2021
14 changes: 7 additions & 7 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ fn make_client_state(
account.key.copy(),
committee,
authority_clients,
account.next_sequence_number,
account.sent_certificates.clone(),
account.received_certificates.clone(),
account.object_ids.clone(),
Expand All @@ -97,16 +96,19 @@ fn make_benchmark_transfer_orders(
// TODO: deterministic sequence of orders to recover from interrupted benchmarks.
let mut next_recipient = get_key_pair().0;
for account in accounts_config.accounts_mut() {
let object_id = *account.object_ids.first().unwrap();
let object_id = account.object_ids.clone().into_keys().next().unwrap();
let transfer = Transfer {
object_id,
sender: account.address,
recipient: Address::FastPay(next_recipient),
sequence_number: account.next_sequence_number,
sequence_number: account.object_ids[&object_id],
user_data: UserData::default(),
};
debug!("Preparing transfer order: {:?}", transfer);
account.next_sequence_number = account.next_sequence_number.increment().unwrap();
account.object_ids.insert(
object_id,
account.object_ids[&object_id].increment().unwrap(),
);
next_recipient = account.address;
let order = Order::new_transfer(transfer.clone(), &account.key);
orders.push(order.clone());
Expand Down Expand Up @@ -430,7 +432,7 @@ fn main() {

let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let mut client_state = make_client_state(
let client_state = make_client_state(
&accounts_config,
&committee_config,
user_address,
Expand All @@ -440,10 +442,8 @@ fn main() {
);
info!("Starting balance query");
let time_start = Instant::now();
let amount = client_state.get_spendable_amount().await.unwrap();
let time_total = time_start.elapsed().as_micros();
info!("Balance confirmed after {} us", time_total);
println!("{:?}", amount);
accounts_config.update_from_state(&client_state);
accounts_config
.write(accounts_config_path)
Expand Down
10 changes: 5 additions & 5 deletions fastpay/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ pub struct UserAccount {
)]
pub address: FastPayAddress,
pub key: KeyPair,
pub next_sequence_number: SequenceNumber,
pub object_ids: Vec<ObjectID>,
pub object_ids: BTreeMap<ObjectID, SequenceNumber>,
pub sent_certificates: Vec<CertifiedOrder>,
pub received_certificates: Vec<CertifiedOrder>,
}
Expand All @@ -110,8 +109,10 @@ impl UserAccount {
Self {
address,
key,
next_sequence_number: SequenceNumber::new(),
object_ids,
object_ids: object_ids
.into_iter()
.map(|object_id| (object_id, SequenceNumber::new()))
.collect(),
sent_certificates: Vec::new(),
received_certificates: Vec::new(),
}
Expand Down Expand Up @@ -144,7 +145,6 @@ impl AccountsConfig {
.accounts
.get_mut(&state.address())
.expect("Updated account should already exist");
account.next_sequence_number = state.next_sequence_number();
account.object_ids = state.object_ids().clone();
account.sent_certificates = state.sent_certificates().clone();
account.received_certificates = state.received_certificates().cloned().collect();
Expand Down
171 changes: 95 additions & 76 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ pub struct ClientState<AuthorityClient> {
committee: Committee,
/// How to talk to this committee.
authority_clients: HashMap<AuthorityName, AuthorityClient>,
/// Expected sequence number for the next certified transfer.
/// This is also the number of transfer certificates that we have created.
next_sequence_number: SequenceNumber,
/// Pending transfer.
pending_transfer: Option<Order>,

Expand All @@ -54,9 +51,9 @@ pub struct ClientState<AuthorityClient> {
sent_certificates: Vec<CertifiedOrder>,
/// Known received certificates, indexed by sender and sequence number.
/// TODO: API to search and download yet unknown `received_certificates`.
received_certificates: BTreeMap<(FastPayAddress, SequenceNumber), CertifiedOrder>,
/// The known objects owned by the client.
object_ids: Vec<ObjectID>,
received_certificates: BTreeMap<ObjectRef, CertifiedOrder>,
/// The known objects with it's sequence number owned by the client.
object_ids: BTreeMap<ObjectID, SequenceNumber>,
}

// Operations are considered successful when they successfully reach a quorum of authorities.
Expand Down Expand Up @@ -98,7 +95,10 @@ pub trait Client {
/// Find how much money we can spend.
/// TODO: Currently, this value only reflects received transfers that were
/// locally processed by `receive_from_fastpay`.
fn get_spendable_amount(&mut self) -> AsyncResult<'_, Amount, anyhow::Error>;
fn get_spendable_amount(
&mut self,
object_id: ObjectID,
) -> AsyncResult<'_, Amount, anyhow::Error>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in thinking this function will probably have to go? Since we do not deal with amounts any more.

}

impl<A> ClientState<A> {
Expand All @@ -108,17 +108,15 @@ impl<A> ClientState<A> {
secret: KeyPair,
committee: Committee,
authority_clients: HashMap<AuthorityName, A>,
next_sequence_number: SequenceNumber,
sent_certificates: Vec<CertifiedOrder>,
received_certificates: Vec<CertifiedOrder>,
object_ids: Vec<ObjectID>,
object_ids: BTreeMap<ObjectID, SequenceNumber>,
) -> Self {
Self {
address,
secret,
committee,
authority_clients,
next_sequence_number,
pending_transfer: None,
sent_certificates,
received_certificates: received_certificates
Expand All @@ -133,11 +131,11 @@ impl<A> ClientState<A> {
self.address
}

pub fn next_sequence_number(&self) -> SequenceNumber {
self.next_sequence_number
pub fn next_sequence_number(&self, object_id: ObjectID) -> SequenceNumber {
self.object_ids[&object_id]
}

pub fn object_ids(&self) -> &Vec<ObjectID> {
pub fn object_ids(&self) -> &BTreeMap<ObjectID, SequenceNumber> {
&self.object_ids
}

Expand Down Expand Up @@ -470,59 +468,48 @@ where
/// Make sure we have all our certificates with sequence number
/// in the range 0..self.next_sequence_number
async fn download_sent_certificates(&self) -> Result<Vec<CertifiedOrder>, FastPayError> {
let mut requesters: Vec<CertificateRequester<_>> = self
.object_ids
.iter()
.map(|object_id| {
CertificateRequester::new(
self.committee.clone(),
self.authority_clients.values().cloned().collect(),
self.address,
*object_id,
)
})
.collect();
let known_sequence_numbers: BTreeSet<_> = self
.sent_certificates
.iter()
.map(|cert| cert.order.sequence_number())
.collect();
let mut sent_certificates = self.sent_certificates.clone();
let mut number = SequenceNumber::from(0);
while number < self.next_sequence_number {
if !known_sequence_numbers.contains(&number) {
let mut requesters = requesters.iter_mut();
// Because we don't know which object is associated to which sequence number, so we try to query each authority with the sequence number until we have a hit.
// TODO: Rethink how we store sequence number, maybe the client should hold a map of sequence number to object id, or each object should have their own sequence number.
let certificate = loop {
let requester = requesters
.next()
.ok_or(FastPayError::CertificateNotfound)
.unwrap();
if let Ok(cert) = requester.query(number).await {
break cert;
}
};
sent_certificates.push(certificate);

for (object_id, next_sequence_number) in self.object_ids.clone() {
let mut requester = CertificateRequester::new(
self.committee.clone(),
self.authority_clients.values().cloned().collect(),
self.address,
object_id,
);

// TODO: it's inefficient to loop through sequence numbers to retrieve missing cert, rethink this logic when we change certificate storage in client.
let mut number = SequenceNumber::from(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always start looking at 0, or should we start at the highest known cert here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is remnant of fastpay, shouldn't need to loop through all the certs after we change how we store certs in client, will add a TODO

while number < next_sequence_number {
if !known_sequence_numbers.contains(&number) {
let certificate = requester.query(number).await?;
sent_certificates.push(certificate);
}
number = number.increment().unwrap_or_else(|_| SequenceNumber::max());
}
number = number.increment().unwrap_or_else(|_| SequenceNumber::max());
}

sent_certificates.sort_by_key(|cert| cert.order.sequence_number());
Ok(sent_certificates)
}

/// Transfers an object to a recipient address.
async fn transfer(
&mut self,
object_id: ObjectID,
(object_id, sequence_number): ObjectRef,
recipient: Address,
user_data: UserData,
) -> Result<CertifiedOrder, anyhow::Error> {
let transfer = Transfer {
object_id,
sender: self.address,
recipient,
sequence_number: self.next_sequence_number,
sequence_number,
user_data,
};
let order = Order::new_transfer(transfer, &self.secret);
Expand All @@ -538,8 +525,9 @@ where
fn update_sent_certificates(
&mut self,
sent_certificates: Vec<CertifiedOrder>,
object_id: ObjectID,
) -> Result<(), FastPayError> {
let mut new_next_sequence_number = self.next_sequence_number;
let mut new_next_sequence_number = self.next_sequence_number(object_id);
for new_cert in &sent_certificates {
if new_cert.order.sequence_number() >= new_next_sequence_number {
new_next_sequence_number = new_cert
Expand All @@ -554,11 +542,30 @@ where
*/
// Atomic update
self.sent_certificates = sent_certificates;
self.next_sequence_number = new_next_sequence_number;
self.object_ids.insert(object_id, new_next_sequence_number);
// Sanity check
// Some certificates of the object will be from received_certs if the object is originated from other sender.
// TODO: Maybe we should store certificates in one place sorted by object_ref instead of sent/received?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i think that is a good idea. In fact the structures a client needs to store certs may be remarkable similar to the structures within an authority? Ie, an index from object ref to cert that created it?

let mut sent_certificates: Vec<CertifiedOrder> = self
.sent_certificates
.clone()
.into_iter()
.filter(|cert| *cert.order.object_id() == object_id)
.collect();

let mut received_certs: Vec<CertifiedOrder> = self
.received_certificates
.clone()
.into_values()
.into_iter()
.filter(|cert| *cert.order.object_id() == object_id)
.collect();

sent_certificates.append(&mut received_certs);

assert_eq!(
self.sent_certificates.len(),
usize::from(self.next_sequence_number)
sent_certificates.len(),
usize::from(self.next_sequence_number(object_id))
);
Ok(())
}
Expand All @@ -574,7 +581,7 @@ where
"Client state has a different pending transfer",
);
ensure!(
order.sequence_number() == self.next_sequence_number,
order.sequence_number() == self.next_sequence_number(*order.object_id()),
"Unexpected sequence number"
);
self.pending_transfer = Some(order.clone());
Expand All @@ -591,14 +598,16 @@ where
// and `next_sequence_number`. (Note that if we were using persistent
// storage, we should ensure update atomicity in the eventuality of a crash.)
self.pending_transfer = None;
self.update_sent_certificates(new_sent_certificates)?;
self.update_sent_certificates(new_sent_certificates, *order.object_id())?;
// Confirm last transfer certificate if needed.
if with_confirmation {
self.communicate_transfers(
self.address,
*order.object_id(),
self.sent_certificates.clone(),
CommunicateAction::SynchronizeNextSequenceNumber(self.next_sequence_number),
CommunicateAction::SynchronizeNextSequenceNumber(
self.next_sequence_number(*order.object_id()),
),
)
.await?;
}
Expand All @@ -616,7 +625,11 @@ where
recipient: FastPayAddress,
user_data: UserData,
) -> AsyncResult<'_, CertifiedOrder, anyhow::Error> {
Box::pin(self.transfer(object_id, Address::FastPay(recipient), user_data))
Box::pin(self.transfer(
(object_id, self.next_sequence_number(object_id)),
Address::FastPay(recipient),
user_data,
))
}

fn transfer_to_primary(
Expand All @@ -625,28 +638,11 @@ where
recipient: PrimaryAddress,
user_data: UserData,
) -> AsyncResult<'_, CertifiedOrder, anyhow::Error> {
Box::pin(self.transfer(object_id, Address::Primary(recipient), user_data))
}

fn get_spendable_amount(&mut self) -> AsyncResult<'_, Amount, anyhow::Error> {
Box::pin(async move {
if let Some(order) = self.pending_transfer.clone() {
// Finish executing the previous transfer.
self.execute_transfer(order, /* with_confirmation */ false)
.await?;
}
if self.sent_certificates.len() < self.next_sequence_number.into() {
// Recover missing sent certificates.
let new_sent_certificates = self.download_sent_certificates().await?;
self.update_sent_certificates(new_sent_certificates)?;
}
/* let amount = if self.balance < Balance::zero() {
Amount::zero()
} else {
Amount::try_from(self.balance).unwrap_or_else(|_| std::u64::MAX.into())
};*/
Ok(Amount::zero())
})
Box::pin(self.transfer(
(object_id, self.next_sequence_number(object_id)),
Address::Primary(recipient),
user_data,
))
}

fn receive_from_fastpay(
Expand Down Expand Up @@ -674,7 +670,10 @@ where
if let btree_map::Entry::Vacant(entry) =
self.received_certificates.entry(transfer.key())
{
// self.balance = self.balance.try_add(transfer.amount.into())?;
self.object_ids.insert(
transfer.object_id,
transfer.sequence_number.increment().unwrap(),
);
entry.insert(certificate);
}
Ok(())
Expand All @@ -698,7 +697,7 @@ where
sender: self.address,
recipient: Address::FastPay(recipient),
// amount,
sequence_number: self.next_sequence_number,
sequence_number: self.next_sequence_number(object_id),
user_data,
};
let order = Order::new_transfer(transfer, &self.secret);
Expand All @@ -708,4 +707,24 @@ where
Ok(new_certificate)
})
}

fn get_spendable_amount(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename this now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this in Client API expansion task

&mut self,
object_id: ObjectID,
) -> AsyncResult<'_, Amount, anyhow::Error> {
Box::pin(async move {
if let Some(order) = self.pending_transfer.clone() {
// Finish executing the previous transfer.
self.execute_transfer(order, /* with_confirmation */ false)
.await?;
}
let next_sequence_number = self.next_sequence_number(object_id);
if self.sent_certificates.len() < next_sequence_number.into() {
// Recover missing sent certificates.
let new_sent_certificates = self.download_sent_certificates().await?;
self.update_sent_certificates(new_sent_certificates, object_id)?;
}
Ok(Amount::zero())
})
}
}
Loading