Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1510 Implement connection pool tracing messages #766

Merged
merged 12 commits into from
Nov 7, 2022
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bitflags = "1.1.0"
bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" }
chrono = { version = "0.4.7", default-features = false, features = ["clock", "std"] }
derivative = "2.1.1"
derive_more = "0.99.17"
flate2 = { version = "1.0", optional = true }
futures-io = "0.3.21"
futures-core = "0.3.14"
Expand Down Expand Up @@ -168,7 +169,6 @@ features = ["v4"]
approx = "0.5.1"
async_once = "0.2.6"
ctrlc = "3.2.2"
derive_more = "0.99.13"
function_name = "0.2.1"
futures = "0.3"
hex = "0.4"
Expand Down
5 changes: 5 additions & 0 deletions src/change_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ where
pub(crate) fn current_batch(&self) -> &VecDeque<RawDocumentBuf> {
self.cursor.current_batch()
}

#[cfg(test)]
pub(crate) fn client(&self) -> &crate::Client {
self.cursor.client()
}
}

/// Arguments passed to a `watch` method, captured to allow resume.
Expand Down
2 changes: 1 addition & 1 deletion src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub use resolver_config::ResolverConfig;
#[cfg(feature = "csfle")]
pub use crate::client::csfle::options::AutoEncryptionOptions;

const DEFAULT_PORT: u16 = 27017;
pub(crate) const DEFAULT_PORT: u16 = 27017;

const URI_OPTIONS: &[&str] = &[
"appname",
Expand Down
46 changes: 28 additions & 18 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
compression::Compressor,
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
event::cmap::{
CmapEventHandler,
CmapEventEmitter,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionClosedEvent,
Expand Down Expand Up @@ -81,10 +81,10 @@ pub(crate) struct Connection {
/// been read.
command_executing: bool,

/// Whether or not this connection has experienced a network error while reading or writing.
/// Once the connection has received an error, it should not be used again or checked back
/// into a pool.
error: bool,
/// Stores a network error encountered while reading or writing. Once the connection has
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old comment seems a little wrong, because we check connections that have errored back into their pools and then allow the pool to handle closing them.

/// received an error, it should not be used again and will be closed upon check-in to the
/// pool.
error: Option<Error>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per conversation with @patrickfreed on the spec PR here we decided to add the errors that cause connections to be closed to "connection closed" tracing events when relevant, and similarly add the errors that cause checkout to fail to "connection checkout failed" tracing events. this required storing the error on the connection, so at the time the pool closes it we can access the error.


/// Whether the most recently received message included the moreToCome flag, indicating the
/// server may send more responses without any additional requests. Attempting to send new
Expand All @@ -106,8 +106,10 @@ pub(crate) struct Connection {
/// connection to the pin holder.
pinned_sender: Option<mpsc::Sender<Connection>>,

/// Type responsible for emitting events related to this connection. This is None for
/// monitoring connections as we do not emit events for those.
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
event_emitter: Option<CmapEventEmitter>,
}

impl Connection {
Expand All @@ -126,9 +128,9 @@ impl Connection {
ready_and_available_time: None,
stream: BufStream::new(stream),
address,
handler: None,
event_emitter: None,
stream_description: None,
error: false,
error: None,
pinned_sender: None,
compressor: None,
more_to_come: false,
Expand All @@ -149,7 +151,7 @@ impl Connection {
pending_connection.id,
generation,
);
conn.handler = pending_connection.event_handler;
conn.event_emitter = Some(pending_connection.event_emitter);
conn
}

Expand Down Expand Up @@ -211,7 +213,7 @@ impl Connection {

/// Checks if the connection experienced a network error and should be closed.
pub(super) fn has_errored(&self) -> bool {
self.error
self.error.is_some()
}

/// Helper to create a `ConnectionCheckedOutEvent` for the connection.
Expand Down Expand Up @@ -244,6 +246,8 @@ impl Connection {
address: self.address.clone(),
connection_id: self.id,
reason,
#[cfg(feature = "tracing-unstable")]
error: self.error.clone(),
}
}

Expand Down Expand Up @@ -272,7 +276,9 @@ impl Connection {
_ => message.write_to(&mut self.stream).await,
};

self.error = write_result.is_err();
if let Err(ref err) = write_result {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result.err() would be nice here, but it's still unstable API. https://doc.rust-lang.org/std/result/enum.Result.html#method.err

self.error = Some(err.clone());
}
write_result?;

let response_message_result = Message::read_from(
Expand All @@ -283,7 +289,9 @@ impl Connection {
)
.await;
self.command_executing = false;
self.error = response_message_result.is_err();
if let Err(ref err) = response_message_result {
self.error = Some(err.clone());
}

let response_message = response_message_result?;
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);
Expand Down Expand Up @@ -342,7 +350,9 @@ impl Connection {
)
.await;
self.command_executing = false;
self.error = response_message_result.is_err();
if let Err(ref err) = response_message_result {
self.error = Some(err.clone());
}

let response_message = response_message_result?;
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);
Expand Down Expand Up @@ -390,8 +400,8 @@ impl Connection {
/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
fn close(&mut self, reason: ConnectionClosedReason) {
self.pool_manager.take();
if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(self.closed_event(reason));
if let Some(ref event_emitter) = self.event_emitter {
event_emitter.emit_event(|| self.closed_event(reason).into());
}
}

Expand All @@ -404,10 +414,10 @@ impl Connection {
address: self.address.clone(),
generation: self.generation,
stream: std::mem::replace(&mut self.stream, BufStream::new(AsyncStream::Null)),
handler: self.handler.take(),
event_emitter: self.event_emitter.take(),
stream_description: self.stream_description.take(),
command_executing: self.command_executing,
error: self.error,
error: self.error.take(),
pool_manager: None,
ready_and_available_time: None,
pinned_sender: self.pinned_sender.clone(),
Expand Down Expand Up @@ -571,7 +581,7 @@ pub(crate) struct PendingConnection {
pub(crate) id: u32,
pub(crate) address: ServerAddress,
pub(crate) generation: PoolGeneration,
pub(crate) event_handler: Option<Arc<dyn CmapEventHandler>>,
pub(crate) event_emitter: CmapEventEmitter,
}

impl PendingConnection {
Expand Down
71 changes: 38 additions & 33 deletions src/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub(crate) mod options;
mod status;
mod worker;

use std::sync::Arc;

use derivative::Derivative;
#[cfg(test)]
use tokio::sync::oneshot;
Expand All @@ -30,7 +28,8 @@ use crate::{
bson::oid::ObjectId,
error::{Error, Result},
event::cmap::{
CmapEventHandler,
CmapEvent,
CmapEventEmitter,
ConnectionCheckoutFailedEvent,
ConnectionCheckoutFailedReason,
ConnectionCheckoutStartedEvent,
Expand Down Expand Up @@ -60,40 +59,44 @@ pub(crate) struct ConnectionPool {
generation_subscriber: PoolGenerationSubscriber,

#[derivative(Debug = "ignore")]
event_handler: Option<Arc<dyn CmapEventHandler>>,
event_emitter: CmapEventEmitter,
}

impl ConnectionPool {
pub(crate) fn new(
address: ServerAddress,
connection_establisher: ConnectionEstablisher,
server_updater: TopologyUpdater,
topology_id: ObjectId,
options: Option<ConnectionPoolOptions>,
) -> Self {
let event_handler = options
.as_ref()
.and_then(|opts| opts.cmap_event_handler.clone());

let event_emitter = CmapEventEmitter::new(event_handler, topology_id);

let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
address.clone(),
connection_establisher,
server_updater,
event_emitter.clone(),
options.clone(),
);

let event_handler = options
.as_ref()
.and_then(|opts| opts.cmap_event_handler.clone());

if let Some(ref handler) = event_handler {
handler.handle_pool_created_event(PoolCreatedEvent {
event_emitter.emit_event(|| {
CmapEvent::PoolCreated(PoolCreatedEvent {
address: address.clone(),
options: options.map(|o| o.to_event_options()),
});
};
})
});

Self {
address,
manager,
connection_requester,
generation_subscriber,
event_handler,
event_emitter,
}
}

Expand All @@ -109,29 +112,19 @@ impl ConnectionPool {
manager,
connection_requester,
generation_subscriber,
event_handler: None,
}
}

fn emit_event<F>(&self, emit: F)
where
F: FnOnce(&Arc<dyn CmapEventHandler>),
{
if let Some(ref handler) = self.event_handler {
emit(handler);
event_emitter: CmapEventEmitter::new(None, ObjectId::new()),
}
}

/// Checks out a connection from the pool. This method will yield until this thread is at the
/// front of the wait queue, and then will block again if no available connections are in the
/// pool and the total number of connections is not less than the max pool size.
pub(crate) async fn check_out(&self) -> Result<Connection> {
self.emit_event(|handler| {
let event = ConnectionCheckoutStartedEvent {
self.event_emitter.emit_event(|| {
ConnectionCheckoutStartedEvent {
address: self.address.clone(),
};

handler.handle_connection_checkout_started_event(event);
}
.into()
});

let response = self.connection_requester.request().await;
Expand All @@ -146,16 +139,28 @@ impl ConnectionPool {

match conn {
Ok(ref conn) => {
self.emit_event(|handler| {
handler.handle_connection_checked_out_event(conn.checked_out_event());
self.event_emitter
.emit_event(|| conn.checked_out_event().into());
}
#[cfg(feature = "tracing-unstable")]
Err(ref err) => {
self.event_emitter.emit_event(|| {
ConnectionCheckoutFailedEvent {
address: self.address.clone(),
reason: ConnectionCheckoutFailedReason::ConnectionError,
error: Some(err.clone()),
}
.into()
});
}
#[cfg(not(feature = "tracing-unstable"))]
Err(_) => {
self.emit_event(|handler| {
handler.handle_connection_checkout_failed_event(ConnectionCheckoutFailedEvent {
self.event_emitter.emit_event(|| {
ConnectionCheckoutFailedEvent {
address: self.address.clone(),
reason: ConnectionCheckoutFailedReason::ConnectionError,
})
}
.into()
});
}
}
Expand Down
Loading