-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-1222 Cancel in-progress operations when SDAM heartbeats time out #1249
Conversation
message: Message, | ||
// This value is only read if a compression feature flag is enabled. | ||
#[allow(unused_variables)] can_compress: bool, | ||
message: impl TryInto<Message, Error = impl Into<Error>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small refactor here to avoid needing to add an equivalent send_command_with_cancellation
for pending connections
// A lagged error indicates that more heartbeats failed than the channel's capacity | ||
// between checking out this connection and executing the operation. If this occurs, | ||
// then proceed with cancelling the operation. RecvError::Closed can be ignored, as | ||
// the sender (and by extension the connection pool) dropping does not indicate that | ||
// the operation should be cancelled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lagged scenario I outlined here will probably never actually happen. The lifetime of a receiver is intentionally very short so that only relevant messages are received. The following would need to occur:
- Connection is checked out for an operation, leading to a new receiver to be constructed. This new receiver will only receive messages sent after its construction.
- Execution path proceeds with the rest of the steps between checkout and actually sending the message, which is primarily building the command. In the meantime...
- SDAM heartbeat times out, leading to a pool clear and a message to be stored in the channel.
- Another SDAM heartbeat times out after waiting the full heartbeat interval and another cancellation message is sent out.
- Slow command construction finishes and
send_message_with_cancellation
is called;recv
below immediately returns a lagged error because the receiver has two unseen messages from the two heartbeat timeouts which exceeds the channel's capacity. In this case we still want to proceed with cancellation.
These receivers are kind of acting like oneshots in that they're created fresh for each checked-out connection and only call recv
once (i.e. on the below line), so the important thing here is to determine whether something was sent during their lifetime.
"Pool clear SHOULD schedule the next background thread run immediately \ | ||
(interruptInUseConnections = false)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This language and the test I've skipped above were added in the PR for the spec work for this ticket:
A pool SHOULD allow immediate scheduling of the next background thread iteration after a clear is performed.
However, this doesn't necessarily seem relevant to interrupting in-use connections, and the test above is testing behavior when interruptInUseConnections
is false. I'm also not sure what it would take to "schedule" maintenance with our existing pool design given that it already happens on an interval when no other requests have been sent to the pool. I'm inclined to file a ticket for implementing this behavior to minimize the scope of this work -- LMK if that sounds good to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! This is much less invasive than any solution I could see.
message: impl TryInto<Message, Error = impl Into<Error>>, | ||
cancellation_receiver: &mut broadcast::Receiver<()>, | ||
) -> Result<RawCommandResponse> { | ||
tokio::select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should this have a biased;
clause to make error behavior deterministic?
Introduces a new
broadcast::Sender
to the connection pool worker that broadcasts a cancellation message when the connection pool clears due to a network timeout. Each time a connection is checked out, it retrieves a correspondingbroadcast::Receiver
that listens for a cancellation message while it executes the operation.