Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1510 Implement connection pool tracing messages #766

Merged
merged 12 commits into from
Nov 7, 2022
19 changes: 11 additions & 8 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::{
compression::Compressor,
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
event::cmap::{
CmapEventHandler,
CmapEvent,
CmapEventEmitter,
ConnectionCheckedInEvent,
ConnectionCheckedOutEvent,
ConnectionClosedEvent,
Expand Down Expand Up @@ -106,8 +107,10 @@ pub(crate) struct Connection {
/// connection to the pin holder.
pinned_sender: Option<mpsc::Sender<Connection>>,

/// Type responsible for emitting events related to this connection. This is None for
/// monitoring connections as we do not emit events for those.
#[derivative(Debug = "ignore")]
handler: Option<Arc<dyn CmapEventHandler>>,
event_emitter: Option<CmapEventEmitter>,
}

impl Connection {
Expand All @@ -126,7 +129,7 @@ impl Connection {
ready_and_available_time: None,
stream: BufStream::new(stream),
address,
handler: None,
event_emitter: None,
stream_description: None,
error: false,
pinned_sender: None,
Expand All @@ -149,7 +152,7 @@ impl Connection {
pending_connection.id,
generation,
);
conn.handler = pending_connection.event_handler;
conn.event_emitter = Some(pending_connection.event_emitter);
conn
}

Expand Down Expand Up @@ -390,8 +393,8 @@ impl Connection {
/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
fn close(&mut self, reason: ConnectionClosedReason) {
self.pool_manager.take();
if let Some(ref handler) = self.handler {
handler.handle_connection_closed_event(self.closed_event(reason));
if let Some(ref event_emitter) = self.event_emitter {
event_emitter.emit_event(|| CmapEvent::ConnectionClosed(self.closed_event(reason)));
}
}

Expand All @@ -404,7 +407,7 @@ impl Connection {
address: self.address.clone(),
generation: self.generation,
stream: std::mem::replace(&mut self.stream, BufStream::new(AsyncStream::Null)),
handler: self.handler.take(),
event_emitter: self.event_emitter.take(),
stream_description: self.stream_description.take(),
command_executing: self.command_executing,
error: self.error,
Expand Down Expand Up @@ -571,7 +574,7 @@ pub(crate) struct PendingConnection {
pub(crate) id: u32,
pub(crate) address: ServerAddress,
pub(crate) generation: PoolGeneration,
pub(crate) event_handler: Option<Arc<dyn CmapEventHandler>>,
pub(crate) event_emitter: CmapEventEmitter,
}

impl PendingConnection {
Expand Down
57 changes: 24 additions & 33 deletions src/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub(crate) mod options;
mod status;
mod worker;

use std::sync::Arc;

use derivative::Derivative;
#[cfg(test)]
use tokio::sync::oneshot;
Expand All @@ -30,7 +28,8 @@ use crate::{
bson::oid::ObjectId,
error::{Error, Result},
event::cmap::{
CmapEventHandler,
CmapEvent,
CmapEventEmitter,
ConnectionCheckoutFailedEvent,
ConnectionCheckoutFailedReason,
ConnectionCheckoutStartedEvent,
Expand Down Expand Up @@ -60,40 +59,44 @@ pub(crate) struct ConnectionPool {
generation_subscriber: PoolGenerationSubscriber,

#[derivative(Debug = "ignore")]
event_handler: Option<Arc<dyn CmapEventHandler>>,
event_emitter: CmapEventEmitter,
}

impl ConnectionPool {
pub(crate) fn new(
address: ServerAddress,
connection_establisher: ConnectionEstablisher,
server_updater: TopologyUpdater,
topology_id: ObjectId,
options: Option<ConnectionPoolOptions>,
) -> Self {
let event_handler = options
.as_ref()
.and_then(|opts| opts.cmap_event_handler.clone());

let event_emitter = CmapEventEmitter::new(event_handler, topology_id);

let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start(
address.clone(),
connection_establisher,
server_updater,
event_emitter.clone(),
options.clone(),
);

let event_handler = options
.as_ref()
.and_then(|opts| opts.cmap_event_handler.clone());

if let Some(ref handler) = event_handler {
handler.handle_pool_created_event(PoolCreatedEvent {
event_emitter.emit_event(|| {
CmapEvent::PoolCreated(PoolCreatedEvent {
address: address.clone(),
options: options.map(|o| o.to_event_options()),
});
};
})
});

Self {
address,
manager,
connection_requester,
generation_subscriber,
event_handler,
event_emitter,
}
}

Expand All @@ -109,29 +112,18 @@ impl ConnectionPool {
manager,
connection_requester,
generation_subscriber,
event_handler: None,
}
}

fn emit_event<F>(&self, emit: F)
where
F: FnOnce(&Arc<dyn CmapEventHandler>),
{
if let Some(ref handler) = self.event_handler {
emit(handler);
event_emitter: CmapEventEmitter::new(None, ObjectId::new()),
}
}

/// Checks out a connection from the pool. This method will yield until this thread is at the
/// front of the wait queue, and then will block again if no available connections are in the
/// pool and the total number of connections is not less than the max pool size.
pub(crate) async fn check_out(&self) -> Result<Connection> {
self.emit_event(|handler| {
let event = ConnectionCheckoutStartedEvent {
self.event_emitter.emit_event(|| {
CmapEvent::ConnectionCheckoutStarted(ConnectionCheckoutStartedEvent {
address: self.address.clone(),
};

handler.handle_connection_checkout_started_event(event);
})
});

let response = self.connection_requester.request().await;
Expand All @@ -146,13 +138,12 @@ impl ConnectionPool {

match conn {
Ok(ref conn) => {
self.emit_event(|handler| {
handler.handle_connection_checked_out_event(conn.checked_out_event());
});
self.event_emitter
.emit_event(|| CmapEvent::ConnectionCheckedOut(conn.checked_out_event()));
}
Err(_) => {
self.emit_event(|handler| {
handler.handle_connection_checkout_failed_event(ConnectionCheckoutFailedEvent {
self.event_emitter.emit_event(|| {
CmapEvent::ConnectionCheckoutFailed(ConnectionCheckoutFailedEvent {
address: self.address.clone(),
reason: ConnectionCheckoutFailedReason::ConnectionError,
})
Expand Down
9 changes: 7 additions & 2 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async fn acquire_connection_and_send_command() {
)
.unwrap(),
TopologyUpdater::channel().0,
bson::oid::ObjectId::new(),
Some(pool_options),
);
let mut connection = pool.check_out().await.unwrap();
Expand Down Expand Up @@ -128,7 +129,8 @@ async fn concurrent_connections() {
let handler = Arc::new(EventHandler::new());
let client_options = CLIENT_OPTIONS.get().await.clone();
let mut options = ConnectionPoolOptions::from_client_options(&client_options);
options.cmap_event_handler = Some(handler.clone() as Arc<dyn crate::cmap::CmapEventHandler>);
options.cmap_event_handler =
Some(handler.clone() as Arc<dyn crate::event::cmap::CmapEventHandler>);
options.ready = Some(true);

let pool = ConnectionPool::new(
Expand All @@ -139,6 +141,7 @@ async fn concurrent_connections() {
)
.unwrap(),
TopologyUpdater::channel().0,
bson::oid::ObjectId::new(),
Some(options),
);

Expand Down Expand Up @@ -221,7 +224,8 @@ async fn connection_error_during_establishment() {

let mut options = ConnectionPoolOptions::from_client_options(&client_options);
options.ready = Some(true);
options.cmap_event_handler = Some(handler.clone() as Arc<dyn crate::cmap::CmapEventHandler>);
options.cmap_event_handler =
Some(handler.clone() as Arc<dyn crate::event::cmap::CmapEventHandler>);
let pool = ConnectionPool::new(
client_options.hosts[0].clone(),
ConnectionEstablisher::new(
Expand All @@ -230,6 +234,7 @@ async fn connection_error_during_establishment() {
)
.unwrap(),
TopologyUpdater::channel().0,
bson::oid::ObjectId::new(),
Some(options),
);

Expand Down
7 changes: 6 additions & 1 deletion src/cmap/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl Executor {
)
.unwrap(),
updater,
bson::oid::ObjectId::new(),
Some(self.pool_options),
);

Expand Down Expand Up @@ -478,5 +479,9 @@ async fn cmap_spec_tests() {
}
}

run_spec_test(&["connection-monitoring-and-pooling"], run_cmap_spec_tests).await;
run_spec_test(
&["connection-monitoring-and-pooling", "cmap-format"],
run_cmap_spec_tests,
)
.await;
}
Loading