-
Notifications
You must be signed in to change notification settings - Fork 176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-1222 Cancel in-progress operations when SDAM heartbeats time out #1249
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,11 @@ use derive_where::derive_where; | |
use serde::Serialize; | ||
use tokio::{ | ||
io::BufStream, | ||
sync::{mpsc, Mutex}, | ||
sync::{ | ||
broadcast::{self, error::RecvError}, | ||
mpsc, | ||
Mutex, | ||
}, | ||
}; | ||
|
||
use self::wire::{Message, MessageFlags}; | ||
|
@@ -171,12 +175,42 @@ impl Connection { | |
self.error.is_some() | ||
} | ||
|
||
pub(crate) async fn send_message_with_cancellation( | ||
&mut self, | ||
message: impl TryInto<Message, Error = impl Into<Error>>, | ||
cancellation_receiver: &mut broadcast::Receiver<()>, | ||
) -> Result<RawCommandResponse> { | ||
tokio::select! { | ||
// A lagged error indicates that more heartbeats failed than the channel's capacity | ||
// between checking out this connection and executing the operation. If this occurs, | ||
// then proceed with cancelling the operation. RecvError::Closed can be ignored, as | ||
// the sender (and by extension the connection pool) dropping does not indicate that | ||
// the operation should be cancelled. | ||
Comment on lines
+186
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lagged scenario I outlined here will probably never actually happen. The lifetime of a receiver is intentionally very short so that only relevant messages are received. The following would need to occur:
These receivers are kind of acting like oneshots in that they're created fresh for each checked-out connection and only call |
||
Ok(_) | Err(RecvError::Lagged(_)) = cancellation_receiver.recv() => { | ||
let error: Error = ErrorKind::ConnectionPoolCleared { | ||
message: format!( | ||
"Connection to {} interrupted due to server monitor timeout", | ||
self.address, | ||
) | ||
}.into(); | ||
self.error = Some(error.clone()); | ||
Err(error) | ||
} | ||
// This future is not cancellation safe because it contains calls to methods that are | ||
// not cancellation safe (e.g. AsyncReadExt::read_exact). However, in the case that | ||
// this future is cancelled because a cancellation message was received, this | ||
// connection will be closed upon being returned to the pool, so any data loss on its | ||
// underlying stream is not an issue. | ||
result = self.send_message(message) => result, | ||
} | ||
} | ||
|
||
pub(crate) async fn send_message( | ||
&mut self, | ||
message: Message, | ||
// This value is only read if a compression feature flag is enabled. | ||
#[allow(unused_variables)] can_compress: bool, | ||
message: impl TryInto<Message, Error = impl Into<Error>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. small refactor here to avoid needing to add an equivalent |
||
) -> Result<RawCommandResponse> { | ||
let message = message.try_into().map_err(Into::into)?; | ||
|
||
if self.more_to_come { | ||
return Err(Error::internal(format!( | ||
"attempted to send a new message to {} but moreToCome bit was set", | ||
|
@@ -192,7 +226,7 @@ impl Connection { | |
feature = "snappy-compression" | ||
))] | ||
let write_result = match self.compressor { | ||
Some(ref compressor) if can_compress => { | ||
Some(ref compressor) if message.should_compress => { | ||
message | ||
.write_op_compressed_to(&mut self.stream, compressor) | ||
.await | ||
|
@@ -232,21 +266,6 @@ impl Connection { | |
)) | ||
} | ||
|
||
/// Executes a `Command` and returns a `CommandResponse` containing the result from the server. | ||
/// | ||
/// An `Ok(...)` result simply means the server received the command and that the driver | ||
/// driver received the response; it does not imply anything about the success of the command | ||
/// itself. | ||
pub(crate) async fn send_command( | ||
&mut self, | ||
command: Command, | ||
request_id: impl Into<Option<i32>>, | ||
) -> Result<RawCommandResponse> { | ||
let to_compress = command.should_compress(); | ||
let message = Message::from_command(command, request_id.into())?; | ||
self.send_message(message, to_compress).await | ||
} | ||
|
||
/// Receive the next message from the connection. | ||
/// This will return an error if the previous response on this connection did not include the | ||
/// moreToCome flag. | ||
|
@@ -378,6 +397,7 @@ pub(crate) struct PendingConnection { | |
pub(crate) generation: PoolGeneration, | ||
pub(crate) event_emitter: CmapEventEmitter, | ||
pub(crate) time_created: Instant, | ||
pub(crate) cancellation_receiver: Option<broadcast::Receiver<()>>, | ||
} | ||
|
||
impl PendingConnection { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should this have a
biased;
clause to make error behavior deterministic?