diff --git a/Cargo.toml b/Cargo.toml index a2646c9b9..4b3295718 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ bitflags = "1.1.0" bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" } chrono = { version = "0.4.7", default-features = false, features = ["clock", "std"] } derivative = "2.1.1" +derive_more = "0.99.17" flate2 = { version = "1.0", optional = true } futures-io = "0.3.21" futures-core = "0.3.14" @@ -168,7 +169,6 @@ features = ["v4"] approx = "0.5.1" async_once = "0.2.6" ctrlc = "3.2.2" -derive_more = "0.99.13" function_name = "0.2.1" futures = "0.3" hex = "0.4" diff --git a/src/change_stream/mod.rs b/src/change_stream/mod.rs index eaf686942..9deff9939 100644 --- a/src/change_stream/mod.rs +++ b/src/change_stream/mod.rs @@ -180,6 +180,11 @@ where pub(crate) fn current_batch(&self) -> &VecDeque { self.cursor.current_batch() } + + #[cfg(test)] + pub(crate) fn client(&self) -> &crate::Client { + self.cursor.client() + } } /// Arguments passed to a `watch` method, captured to allow resume. diff --git a/src/client/options/mod.rs b/src/client/options/mod.rs index 29dc83c53..8dab1f1ff 100644 --- a/src/client/options/mod.rs +++ b/src/client/options/mod.rs @@ -46,7 +46,7 @@ pub use resolver_config::ResolverConfig; #[cfg(feature = "csfle")] pub use crate::client::csfle::options::AutoEncryptionOptions; -const DEFAULT_PORT: u16 = 27017; +pub(crate) const DEFAULT_PORT: u16 = 27017; const URI_OPTIONS: &[&str] = &[ "appname", diff --git a/src/cmap/conn/mod.rs b/src/cmap/conn/mod.rs index 5750fe025..9f0608b8b 100644 --- a/src/cmap/conn/mod.rs +++ b/src/cmap/conn/mod.rs @@ -22,7 +22,7 @@ use crate::{ compression::Compressor, error::{load_balanced_mode_mismatch, Error, ErrorKind, Result}, event::cmap::{ - CmapEventHandler, + CmapEventEmitter, ConnectionCheckedInEvent, ConnectionCheckedOutEvent, ConnectionClosedEvent, @@ -81,10 +81,10 @@ pub(crate) struct Connection { /// been read. command_executing: bool, - /// Whether or not this connection has experienced a network error while reading or writing. - /// Once the connection has received an error, it should not be used again or checked back - /// into a pool. - error: bool, + /// Stores a network error encountered while reading or writing. Once the connection has + /// received an error, it should not be used again and will be closed upon check-in to the + /// pool. + error: Option, /// Whether the most recently received message included the moreToCome flag, indicating the /// server may send more responses without any additional requests. Attempting to send new @@ -106,8 +106,10 @@ pub(crate) struct Connection { /// connection to the pin holder. pinned_sender: Option>, + /// Type responsible for emitting events related to this connection. This is None for + /// monitoring connections as we do not emit events for those. #[derivative(Debug = "ignore")] - handler: Option>, + event_emitter: Option, } impl Connection { @@ -126,9 +128,9 @@ impl Connection { ready_and_available_time: None, stream: BufStream::new(stream), address, - handler: None, + event_emitter: None, stream_description: None, - error: false, + error: None, pinned_sender: None, compressor: None, more_to_come: false, @@ -149,7 +151,7 @@ impl Connection { pending_connection.id, generation, ); - conn.handler = pending_connection.event_handler; + conn.event_emitter = Some(pending_connection.event_emitter); conn } @@ -211,7 +213,7 @@ impl Connection { /// Checks if the connection experienced a network error and should be closed. pub(super) fn has_errored(&self) -> bool { - self.error + self.error.is_some() } /// Helper to create a `ConnectionCheckedOutEvent` for the connection. @@ -244,6 +246,8 @@ impl Connection { address: self.address.clone(), connection_id: self.id, reason, + #[cfg(feature = "tracing-unstable")] + error: self.error.clone(), } } @@ -272,7 +276,9 @@ impl Connection { _ => message.write_to(&mut self.stream).await, }; - self.error = write_result.is_err(); + if let Err(ref err) = write_result { + self.error = Some(err.clone()); + } write_result?; let response_message_result = Message::read_from( @@ -283,7 +289,9 @@ impl Connection { ) .await; self.command_executing = false; - self.error = response_message_result.is_err(); + if let Err(ref err) = response_message_result { + self.error = Some(err.clone()); + } let response_message = response_message_result?; self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME); @@ -342,7 +350,9 @@ impl Connection { ) .await; self.command_executing = false; - self.error = response_message_result.is_err(); + if let Err(ref err) = response_message_result { + self.error = Some(err.clone()); + } let response_message = response_message_result?; self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME); @@ -390,8 +400,8 @@ impl Connection { /// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason. fn close(&mut self, reason: ConnectionClosedReason) { self.pool_manager.take(); - if let Some(ref handler) = self.handler { - handler.handle_connection_closed_event(self.closed_event(reason)); + if let Some(ref event_emitter) = self.event_emitter { + event_emitter.emit_event(|| self.closed_event(reason).into()); } } @@ -404,10 +414,10 @@ impl Connection { address: self.address.clone(), generation: self.generation, stream: std::mem::replace(&mut self.stream, BufStream::new(AsyncStream::Null)), - handler: self.handler.take(), + event_emitter: self.event_emitter.take(), stream_description: self.stream_description.take(), command_executing: self.command_executing, - error: self.error, + error: self.error.take(), pool_manager: None, ready_and_available_time: None, pinned_sender: self.pinned_sender.clone(), @@ -571,7 +581,7 @@ pub(crate) struct PendingConnection { pub(crate) id: u32, pub(crate) address: ServerAddress, pub(crate) generation: PoolGeneration, - pub(crate) event_handler: Option>, + pub(crate) event_emitter: CmapEventEmitter, } impl PendingConnection { diff --git a/src/cmap/mod.rs b/src/cmap/mod.rs index 46995ab79..a942096df 100644 --- a/src/cmap/mod.rs +++ b/src/cmap/mod.rs @@ -9,8 +9,6 @@ pub(crate) mod options; mod status; mod worker; -use std::sync::Arc; - use derivative::Derivative; #[cfg(test)] use tokio::sync::oneshot; @@ -30,7 +28,8 @@ use crate::{ bson::oid::ObjectId, error::{Error, Result}, event::cmap::{ - CmapEventHandler, + CmapEvent, + CmapEventEmitter, ConnectionCheckoutFailedEvent, ConnectionCheckoutFailedReason, ConnectionCheckoutStartedEvent, @@ -60,7 +59,7 @@ pub(crate) struct ConnectionPool { generation_subscriber: PoolGenerationSubscriber, #[derivative(Debug = "ignore")] - event_handler: Option>, + event_emitter: CmapEventEmitter, } impl ConnectionPool { @@ -68,32 +67,36 @@ impl ConnectionPool { address: ServerAddress, connection_establisher: ConnectionEstablisher, server_updater: TopologyUpdater, + topology_id: ObjectId, options: Option, ) -> Self { + let event_handler = options + .as_ref() + .and_then(|opts| opts.cmap_event_handler.clone()); + + let event_emitter = CmapEventEmitter::new(event_handler, topology_id); + let (manager, connection_requester, generation_subscriber) = ConnectionPoolWorker::start( address.clone(), connection_establisher, server_updater, + event_emitter.clone(), options.clone(), ); - let event_handler = options - .as_ref() - .and_then(|opts| opts.cmap_event_handler.clone()); - - if let Some(ref handler) = event_handler { - handler.handle_pool_created_event(PoolCreatedEvent { + event_emitter.emit_event(|| { + CmapEvent::PoolCreated(PoolCreatedEvent { address: address.clone(), options: options.map(|o| o.to_event_options()), - }); - }; + }) + }); Self { address, manager, connection_requester, generation_subscriber, - event_handler, + event_emitter, } } @@ -109,16 +112,7 @@ impl ConnectionPool { manager, connection_requester, generation_subscriber, - event_handler: None, - } - } - - fn emit_event(&self, emit: F) - where - F: FnOnce(&Arc), - { - if let Some(ref handler) = self.event_handler { - emit(handler); + event_emitter: CmapEventEmitter::new(None, ObjectId::new()), } } @@ -126,12 +120,11 @@ impl ConnectionPool { /// front of the wait queue, and then will block again if no available connections are in the /// pool and the total number of connections is not less than the max pool size. pub(crate) async fn check_out(&self) -> Result { - self.emit_event(|handler| { - let event = ConnectionCheckoutStartedEvent { + self.event_emitter.emit_event(|| { + ConnectionCheckoutStartedEvent { address: self.address.clone(), - }; - - handler.handle_connection_checkout_started_event(event); + } + .into() }); let response = self.connection_requester.request().await; @@ -146,16 +139,28 @@ impl ConnectionPool { match conn { Ok(ref conn) => { - self.emit_event(|handler| { - handler.handle_connection_checked_out_event(conn.checked_out_event()); + self.event_emitter + .emit_event(|| conn.checked_out_event().into()); + } + #[cfg(feature = "tracing-unstable")] + Err(ref err) => { + self.event_emitter.emit_event(|| { + ConnectionCheckoutFailedEvent { + address: self.address.clone(), + reason: ConnectionCheckoutFailedReason::ConnectionError, + error: Some(err.clone()), + } + .into() }); } + #[cfg(not(feature = "tracing-unstable"))] Err(_) => { - self.emit_event(|handler| { - handler.handle_connection_checkout_failed_event(ConnectionCheckoutFailedEvent { + self.event_emitter.emit_event(|| { + ConnectionCheckoutFailedEvent { address: self.address.clone(), reason: ConnectionCheckoutFailedReason::ConnectionError, - }) + } + .into() }); } } diff --git a/src/cmap/test/event.rs b/src/cmap/test/event.rs index 5c35c9a95..62ddd7b07 100644 --- a/src/cmap/test/event.rs +++ b/src/cmap/test/event.rs @@ -1,17 +1,14 @@ -use std::{ - sync::{Arc, RwLock}, - time::Duration, -}; +use std::sync::{Arc, RwLock}; use serde::{de::Unexpected, Deserialize, Deserializer, Serialize}; -use crate::{event::cmap::*, options::ServerAddress, runtime}; -use tokio::sync::broadcast::error::{RecvError, SendError}; +use crate::{event::cmap::*, options::ServerAddress, test::util::EventSubscriber}; +use tokio::sync::broadcast::error::SendError; #[derive(Clone, Debug)] pub struct EventHandler { - pub events: Arc>>, - channel_sender: tokio::sync::broadcast::Sender, + pub(crate) events: Arc>>, + channel_sender: tokio::sync::broadcast::Sender, } impl EventHandler { @@ -23,19 +20,16 @@ impl EventHandler { } } - fn handle>(&self, event: E) { + fn handle>(&self, event: E) { let event = event.into(); // this only errors if no receivers are listening which isn't a concern here. - let _: std::result::Result> = + let _: std::result::Result> = self.channel_sender.send(event.clone()); self.events.write().unwrap().push(event); } - pub fn subscribe(&self) -> EventSubscriber { - EventSubscriber { - _handler: self, - receiver: self.channel_sender.subscribe(), - } + pub(crate) fn subscribe(&self) -> EventSubscriber<'_, EventHandler, CmapEvent> { + EventSubscriber::new(self, self.channel_sender.subscribe()) } } @@ -85,76 +79,7 @@ impl CmapEventHandler for EventHandler { } } -pub struct EventSubscriber<'a> { - /// A reference to the handler this subscriber is receiving events from. - /// Stored here to ensure this subscriber cannot outlive the handler that is generating its - /// events. - _handler: &'a EventHandler, - receiver: tokio::sync::broadcast::Receiver, -} - -impl EventSubscriber<'_> { - pub async fn wait_for_event(&mut self, timeout: Duration, filter: F) -> Option - where - F: Fn(&Event) -> bool, - { - runtime::timeout(timeout, async { - loop { - match self.receiver.recv().await { - Ok(event) if filter(&event) => return event.into(), - // the channel hit capacity and the channnel will skip a few to catch up. - Err(RecvError::Lagged(_)) => continue, - Err(_) => return None, - _ => continue, - } - } - }) - .await - .ok() - .flatten() - } - - /// Returns the received events without waiting for any more. - pub fn all(&mut self, filter: F) -> Vec - where - F: Fn(&Event) -> bool, - { - let mut events = Vec::new(); - while let Ok(event) = self.receiver.try_recv() { - if filter(&event) { - events.push(event); - } - } - events - } -} - -#[allow(clippy::large_enum_variant)] -#[derive(Clone, Debug, Deserialize, From, PartialEq)] -#[serde(tag = "type")] -pub enum Event { - #[serde( - deserialize_with = "self::deserialize_pool_created", - rename = "ConnectionPoolCreated" - )] - PoolCreated(PoolCreatedEvent), - #[serde(rename = "ConnectionPoolClosed")] - PoolClosed(PoolClosedEvent), - #[serde(rename = "ConnectionPoolReady")] - PoolReady(PoolReadyEvent), - ConnectionCreated(ConnectionCreatedEvent), - ConnectionReady(ConnectionReadyEvent), - ConnectionClosed(ConnectionClosedEvent), - ConnectionCheckOutStarted(ConnectionCheckoutStartedEvent), - #[serde(deserialize_with = "self::deserialize_checkout_failed")] - ConnectionCheckOutFailed(ConnectionCheckoutFailedEvent), - ConnectionCheckedOut(ConnectionCheckedOutEvent), - #[serde(rename = "ConnectionPoolCleared")] - PoolCleared(PoolClearedEvent), - ConnectionCheckedIn(ConnectionCheckedInEvent), -} - -impl Serialize for Event { +impl Serialize for CmapEvent { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, @@ -166,8 +91,8 @@ impl Serialize for Event { Self::ConnectionCreated(event) => event.serialize(serializer), Self::ConnectionReady(event) => event.serialize(serializer), Self::ConnectionClosed(event) => event.serialize(serializer), - Self::ConnectionCheckOutStarted(event) => event.serialize(serializer), - Self::ConnectionCheckOutFailed(event) => event.serialize(serializer), + Self::ConnectionCheckoutStarted(event) => event.serialize(serializer), + Self::ConnectionCheckoutFailed(event) => event.serialize(serializer), Self::ConnectionCheckedOut(event) => event.serialize(serializer), Self::PoolCleared(event) => event.serialize(serializer), Self::ConnectionCheckedIn(event) => event.serialize(serializer), @@ -175,20 +100,64 @@ impl Serialize for Event { } } -impl Event { +impl<'de> Deserialize<'de> for CmapEvent { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(tag = "type")] + // clippy doesn't like that all variants start with the same name, but we use these + // to match the test file names. + #[allow(clippy::enum_variant_names)] + enum EventHelper { + #[serde(deserialize_with = "self::deserialize_pool_created")] + ConnectionPoolCreated(PoolCreatedEvent), + ConnectionPoolClosed(PoolClosedEvent), + ConnectionPoolReady(PoolReadyEvent), + ConnectionCreated(ConnectionCreatedEvent), + ConnectionReady(ConnectionReadyEvent), + ConnectionClosed(ConnectionClosedEvent), + ConnectionCheckOutStarted(ConnectionCheckoutStartedEvent), + #[serde(deserialize_with = "self::deserialize_checkout_failed")] + ConnectionCheckOutFailed(ConnectionCheckoutFailedEvent), + ConnectionCheckedOut(ConnectionCheckedOutEvent), + ConnectionPoolCleared(PoolClearedEvent), + ConnectionCheckedIn(ConnectionCheckedInEvent), + } + + let helper = EventHelper::deserialize(deserializer)?; + let event = match helper { + EventHelper::ConnectionPoolCreated(e) => CmapEvent::PoolCreated(e), + EventHelper::ConnectionPoolClosed(e) => CmapEvent::PoolClosed(e), + EventHelper::ConnectionPoolReady(e) => CmapEvent::PoolReady(e), + EventHelper::ConnectionCreated(e) => CmapEvent::ConnectionCreated(e), + EventHelper::ConnectionReady(e) => CmapEvent::ConnectionReady(e), + EventHelper::ConnectionClosed(e) => CmapEvent::ConnectionClosed(e), + EventHelper::ConnectionCheckOutStarted(e) => CmapEvent::ConnectionCheckoutStarted(e), + EventHelper::ConnectionCheckOutFailed(e) => CmapEvent::ConnectionCheckoutFailed(e), + EventHelper::ConnectionCheckedOut(e) => CmapEvent::ConnectionCheckedOut(e), + EventHelper::ConnectionPoolCleared(e) => CmapEvent::PoolCleared(e), + EventHelper::ConnectionCheckedIn(e) => CmapEvent::ConnectionCheckedIn(e), + }; + Ok(event) + } +} + +impl CmapEvent { pub fn name(&self) -> &'static str { match self { - Event::PoolCreated(_) => "ConnectionPoolCreated", - Event::PoolReady(_) => "ConnectionPoolReady", - Event::PoolClosed(_) => "ConnectionPoolClosed", - Event::ConnectionCreated(_) => "ConnectionCreated", - Event::ConnectionReady(_) => "ConnectionReady", - Event::ConnectionClosed(_) => "ConnectionClosed", - Event::ConnectionCheckOutStarted(_) => "ConnectionCheckOutStarted", - Event::ConnectionCheckOutFailed(_) => "ConnectionCheckOutFailed", - Event::ConnectionCheckedOut(_) => "ConnectionCheckedOut", - Event::PoolCleared(_) => "ConnectionPoolCleared", - Event::ConnectionCheckedIn(_) => "ConnectionCheckedIn", + CmapEvent::PoolCreated(_) => "ConnectionPoolCreated", + CmapEvent::PoolReady(_) => "ConnectionPoolReady", + CmapEvent::PoolClosed(_) => "ConnectionPoolClosed", + CmapEvent::ConnectionCreated(_) => "ConnectionCreated", + CmapEvent::ConnectionReady(_) => "ConnectionReady", + CmapEvent::ConnectionClosed(_) => "ConnectionClosed", + CmapEvent::ConnectionCheckoutStarted(_) => "ConnectionCheckOutStarted", + CmapEvent::ConnectionCheckoutFailed(_) => "ConnectionCheckOutFailed", + CmapEvent::ConnectionCheckedOut(_) => "ConnectionCheckedOut", + CmapEvent::PoolCleared(_) => "ConnectionPoolCleared", + CmapEvent::ConnectionCheckedIn(_) => "ConnectionCheckedIn", } } @@ -196,17 +165,17 @@ impl Event { // tests. pub fn planned_maintenance_testing_name(&self) -> &'static str { match self { - Event::PoolCreated(_) => "PoolCreatedEvent", - Event::PoolReady(_) => "PoolReadyEvent", - Event::PoolCleared(_) => "PoolClearedEvent", - Event::PoolClosed(_) => "PoolClosedEvent", - Event::ConnectionCreated(_) => "ConnectionCreatedEvent", - Event::ConnectionReady(_) => "ConnectionReadyEvent", - Event::ConnectionClosed(_) => "ConnectionClosedEvent", - Event::ConnectionCheckOutStarted(_) => "ConnectionCheckOutStartedEvent", - Event::ConnectionCheckOutFailed(_) => "ConnectionCheckOutFailedEvent", - Event::ConnectionCheckedOut(_) => "ConnectionCheckedOutEvent", - Event::ConnectionCheckedIn(_) => "ConnectionCheckedInEvent", + CmapEvent::PoolCreated(_) => "PoolCreatedEvent", + CmapEvent::PoolReady(_) => "PoolReadyEvent", + CmapEvent::PoolCleared(_) => "PoolClearedEvent", + CmapEvent::PoolClosed(_) => "PoolClosedEvent", + CmapEvent::ConnectionCreated(_) => "ConnectionCreatedEvent", + CmapEvent::ConnectionReady(_) => "ConnectionReadyEvent", + CmapEvent::ConnectionClosed(_) => "ConnectionClosedEvent", + CmapEvent::ConnectionCheckoutStarted(_) => "ConnectionCheckOutStartedEvent", + CmapEvent::ConnectionCheckoutFailed(_) => "ConnectionCheckOutFailedEvent", + CmapEvent::ConnectionCheckedOut(_) => "ConnectionCheckedOutEvent", + CmapEvent::ConnectionCheckedIn(_) => "ConnectionCheckedInEvent", } } } @@ -297,5 +266,7 @@ where port: None, }, reason, + #[cfg(feature = "tracing-unstable")] + error: None, }) } diff --git a/src/cmap/test/file.rs b/src/cmap/test/file.rs index bb7390ed8..d8e6082d7 100644 --- a/src/cmap/test/file.rs +++ b/src/cmap/test/file.rs @@ -2,8 +2,14 @@ use std::{sync::Arc, time::Duration}; use serde::Deserialize; -use super::{event::Event, State}; -use crate::{bson_util, cmap::options::ConnectionPoolOptions, error::Result, test::RunOn}; +use super::State; +use crate::{ + bson_util, + cmap::options::ConnectionPoolOptions, + error::Result, + event::cmap::CmapEvent, + test::RunOn, +}; use bson::Document; #[derive(Debug, Deserialize)] @@ -17,7 +23,7 @@ pub struct TestFile { pub(crate) pool_options: Option, pub operations: Vec, pub error: Option, - pub events: Vec, + pub(crate) events: Vec, #[serde(default)] pub ignore: Vec, pub fail_point: Option, diff --git a/src/cmap/test/integration.rs b/src/cmap/test/integration.rs index efaa7d02c..7d9c5ed06 100644 --- a/src/cmap/test/integration.rs +++ b/src/cmap/test/integration.rs @@ -1,10 +1,7 @@ use serde::Deserialize; use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; -use super::{ - event::{Event, EventHandler}, - EVENT_TIMEOUT, -}; +use super::{event::EventHandler, EVENT_TIMEOUT}; use crate::{ bson::{doc, Document}, cmap::{ @@ -13,7 +10,7 @@ use crate::{ Command, ConnectionPool, }, - event::cmap::{CmapEventHandler, ConnectionClosedReason}, + event::cmap::{CmapEvent, CmapEventHandler, ConnectionClosedReason}, hello::LEGACY_HELLO_COMMAND_NAME, operation::CommandResponse, runtime, @@ -59,6 +56,7 @@ async fn acquire_connection_and_send_command() { ) .unwrap(), TopologyUpdater::channel().0, + bson::oid::ObjectId::new(), Some(pool_options), ); let mut connection = pool.check_out().await.unwrap(); @@ -128,7 +126,8 @@ async fn concurrent_connections() { let handler = Arc::new(EventHandler::new()); let client_options = CLIENT_OPTIONS.get().await.clone(); let mut options = ConnectionPoolOptions::from_client_options(&client_options); - options.cmap_event_handler = Some(handler.clone() as Arc); + options.cmap_event_handler = + Some(handler.clone() as Arc); options.ready = Some(true); let pool = ConnectionPool::new( @@ -139,6 +138,7 @@ async fn concurrent_connections() { ) .unwrap(), TopologyUpdater::channel().0, + bson::oid::ObjectId::new(), Some(options), ); @@ -156,10 +156,10 @@ async fn concurrent_connections() { let mut consecutive_creations = 0; for event in events.iter() { match event { - Event::ConnectionCreated(_) => { + CmapEvent::ConnectionCreated(_) => { consecutive_creations += 1; } - Event::ConnectionReady(_) => { + CmapEvent::ConnectionReady(_) => { assert!( consecutive_creations >= 2, "connections not created concurrently" @@ -221,7 +221,8 @@ async fn connection_error_during_establishment() { let mut options = ConnectionPoolOptions::from_client_options(&client_options); options.ready = Some(true); - options.cmap_event_handler = Some(handler.clone() as Arc); + options.cmap_event_handler = + Some(handler.clone() as Arc); let pool = ConnectionPool::new( client_options.hosts[0].clone(), ConnectionEstablisher::new( @@ -230,6 +231,7 @@ async fn connection_error_during_establishment() { ) .unwrap(), TopologyUpdater::channel().0, + bson::oid::ObjectId::new(), Some(options), ); @@ -237,7 +239,7 @@ async fn connection_error_during_establishment() { subscriber .wait_for_event(EVENT_TIMEOUT, |e| match e { - Event::ConnectionClosed(event) => { + CmapEvent::ConnectionClosed(event) => { event.connection_id == 1 && event.reason == ConnectionClosedReason::Error } _ => false, @@ -281,7 +283,7 @@ async fn connection_error_during_operation() { subscriber .wait_for_event(EVENT_TIMEOUT, |e| match e { - Event::ConnectionClosed(event) => { + CmapEvent::ConnectionClosed(event) => { event.connection_id == 1 && event.reason == ConnectionClosedReason::Error } _ => false, diff --git a/src/cmap/test/mod.rs b/src/cmap/test/mod.rs index 3b7cc4742..739cf74f4 100644 --- a/src/cmap/test/mod.rs +++ b/src/cmap/test/mod.rs @@ -7,7 +7,7 @@ use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration}; use tokio::sync::{Mutex, RwLock, RwLockWriteGuard}; use self::{ - event::{Event, EventHandler}, + event::EventHandler, file::{Operation, TestFile, ThreadedOperation}, }; @@ -19,7 +19,7 @@ use crate::{ ConnectionPoolOptions, }, error::{Error, ErrorKind, Result}, - event::cmap::ConnectionPoolOptions as EventOptions, + event::cmap::{CmapEvent, ConnectionPoolOptions as EventOptions}, options::TlsOptions, runtime, runtime::AsyncJoinHandle, @@ -63,7 +63,7 @@ struct Executor { description: String, operations: Vec, error: Option, - events: Vec, + events: Vec, state: Arc, ignored_event_names: Vec, pool_options: ConnectionPoolOptions, @@ -165,6 +165,7 @@ impl Executor { ) .unwrap(), updater, + bson::oid::ObjectId::new(), Some(self.pool_options), ); @@ -207,7 +208,7 @@ impl Executor { let ignored_event_names = self.ignored_event_names; let description = self.description; - let filter = |e: &Event| !ignored_event_names.iter().any(|name| e.name() == name); + let filter = |e: &CmapEvent| !ignored_event_names.iter().any(|name| e.name() == name); for expected_event in self.events { let actual_event = subscriber .wait_for_event(EVENT_TIMEOUT, filter) @@ -279,7 +280,7 @@ impl Operation { // wait for event to be emitted to ensure check in has completed. subscriber .wait_for_event(EVENT_TIMEOUT, |e| { - matches!(e, Event::ConnectionCheckedIn(event) if event.connection_id == id) + matches!(e, CmapEvent::ConnectionCheckedIn(event) if event.connection_id == id) }) .await .unwrap_or_else(|| { @@ -314,7 +315,7 @@ impl Operation { // wait for event to be emitted to ensure drop has completed. subscriber - .wait_for_event(EVENT_TIMEOUT, |e| matches!(e, Event::PoolClosed(_))) + .wait_for_event(EVENT_TIMEOUT, |e| matches!(e, CmapEvent::PoolClosed(_))) .await .expect("did not receive ConnectionPoolClosed event after closing pool"); } @@ -374,19 +375,19 @@ impl Matchable for EventOptions { } } -impl Matchable for Event { - fn content_matches(&self, expected: &Event) -> std::result::Result<(), String> { +impl Matchable for CmapEvent { + fn content_matches(&self, expected: &CmapEvent) -> std::result::Result<(), String> { match (self, expected) { - (Event::PoolCreated(actual), Event::PoolCreated(ref expected)) => { + (CmapEvent::PoolCreated(actual), CmapEvent::PoolCreated(ref expected)) => { actual.options.matches(&expected.options) } - (Event::ConnectionCreated(actual), Event::ConnectionCreated(ref expected)) => { + (CmapEvent::ConnectionCreated(actual), CmapEvent::ConnectionCreated(ref expected)) => { actual.connection_id.matches(&expected.connection_id) } - (Event::ConnectionReady(actual), Event::ConnectionReady(ref expected)) => { + (CmapEvent::ConnectionReady(actual), CmapEvent::ConnectionReady(ref expected)) => { actual.connection_id.matches(&expected.connection_id) } - (Event::ConnectionClosed(actual), Event::ConnectionClosed(ref expected)) => { + (CmapEvent::ConnectionClosed(actual), CmapEvent::ConnectionClosed(ref expected)) => { eq_matches("reason", &actual.reason, &expected.reason)?; actual .connection_id @@ -394,15 +395,17 @@ impl Matchable for Event { .prefix("connection_id")?; Ok(()) } - (Event::ConnectionCheckedOut(actual), Event::ConnectionCheckedOut(ref expected)) => { - actual.connection_id.matches(&expected.connection_id) - } - (Event::ConnectionCheckedIn(actual), Event::ConnectionCheckedIn(ref expected)) => { - actual.connection_id.matches(&expected.connection_id) - } ( - Event::ConnectionCheckOutFailed(actual), - Event::ConnectionCheckOutFailed(ref expected), + CmapEvent::ConnectionCheckedOut(actual), + CmapEvent::ConnectionCheckedOut(ref expected), + ) => actual.connection_id.matches(&expected.connection_id), + ( + CmapEvent::ConnectionCheckedIn(actual), + CmapEvent::ConnectionCheckedIn(ref expected), + ) => actual.connection_id.matches(&expected.connection_id), + ( + CmapEvent::ConnectionCheckoutFailed(actual), + CmapEvent::ConnectionCheckoutFailed(ref expected), ) => { if actual.reason == expected.reason { Ok(()) @@ -413,10 +416,12 @@ impl Matchable for Event { )) } } - (Event::ConnectionCheckOutStarted(_), Event::ConnectionCheckOutStarted(_)) => Ok(()), - (Event::PoolCleared(_), Event::PoolCleared(_)) => Ok(()), - (Event::PoolReady(_), Event::PoolReady(_)) => Ok(()), - (Event::PoolClosed(_), Event::PoolClosed(_)) => Ok(()), + (CmapEvent::ConnectionCheckoutStarted(_), CmapEvent::ConnectionCheckoutStarted(_)) => { + Ok(()) + } + (CmapEvent::PoolCleared(_), CmapEvent::PoolCleared(_)) => Ok(()), + (CmapEvent::PoolReady(_), CmapEvent::PoolReady(_)) => Ok(()), + (CmapEvent::PoolClosed(_), CmapEvent::PoolClosed(_)) => Ok(()), (actual, expected) => Err(format!("expected event {:?}, got {:?}", actual, expected)), } } @@ -478,5 +483,9 @@ async fn cmap_spec_tests() { } } - run_spec_test(&["connection-monitoring-and-pooling"], run_cmap_spec_tests).await; + run_spec_test( + &["connection-monitoring-and-pooling", "cmap-format"], + run_cmap_spec_tests, + ) + .await; } diff --git a/src/cmap/worker.rs b/src/cmap/worker.rs index 1a9765303..69d794a2c 100644 --- a/src/cmap/worker.rs +++ b/src/cmap/worker.rs @@ -23,7 +23,7 @@ use crate::{ client::auth::Credential, error::{load_balanced_mode_mismatch, Error, ErrorKind, Result}, event::cmap::{ - CmapEventHandler, + CmapEventEmitter, ConnectionClosedEvent, ConnectionClosedReason, PoolClearedEvent, @@ -37,7 +37,6 @@ use crate::{ use std::{ collections::{HashMap, VecDeque}, - sync::Arc, time::Duration, }; @@ -82,8 +81,9 @@ pub(crate) struct ConnectionPoolWorker { /// The credential used to authenticate connections, if any. credential: Option, - /// The event handler specified by the user to process CMAP events. - event_handler: Option>, + /// The type responsible for emitting CMAP events both to an optional user-specified handler + /// and as tracing events. + event_emitter: CmapEventEmitter, /// The time between maintenance tasks. maintenance_frequency: Duration, @@ -139,12 +139,9 @@ impl ConnectionPoolWorker { address: ServerAddress, establisher: ConnectionEstablisher, server_updater: TopologyUpdater, + event_emitter: CmapEventEmitter, options: Option, ) -> (PoolManager, ConnectionRequester, PoolGenerationSubscriber) { - let event_handler = options - .as_ref() - .and_then(|opts| opts.cmap_event_handler.clone()); - // The CMAP spec indicates that a max idle time of zero means that connections should not be // closed due to idleness. let mut max_idle_time = options.as_ref().and_then(|opts| opts.max_idle_time); @@ -209,7 +206,7 @@ impl ConnectionPoolWorker { let worker = ConnectionPoolWorker { address, - event_handler: event_handler.clone(), + event_emitter, max_idle_time, min_pool_size, credential, @@ -328,10 +325,11 @@ impl ConnectionPoolWorker { connection.close_and_drop(ConnectionClosedReason::PoolClosed); } - self.emit_event(|handler| { - handler.handle_pool_closed_event(PoolClosedEvent { + self.event_emitter.emit_event(|| { + PoolClosedEvent { address: self.address.clone(), - }); + } + .into() }); } @@ -380,7 +378,7 @@ impl ConnectionPoolWorker { // otherwise, attempt to create a connection. if self.below_max_connections() { - let event_handler = self.event_handler.clone(); + let event_emitter = self.event_emitter.clone(); let establisher = self.establisher.clone(); let pending_connection = self.create_pending_connection(); let manager = self.manager.clone(); @@ -394,7 +392,7 @@ impl ConnectionPoolWorker { server_updater, &manager, credential, - event_handler, + event_emitter, ) .await; @@ -427,12 +425,11 @@ impl ConnectionPoolWorker { id: self.next_connection_id, address: self.address.clone(), generation: self.generation.clone(), - event_handler: self.event_handler.clone(), + event_emitter: self.event_emitter.clone(), }; self.next_connection_id += 1; - self.emit_event(|handler| { - handler.handle_connection_created_event(pending_connection.created_event()) - }); + self.event_emitter + .emit_event(|| pending_connection.created_event().into()); pending_connection } @@ -461,9 +458,8 @@ impl ConnectionPoolWorker { } fn check_in(&mut self, mut conn: Connection) { - self.emit_event(|handler| { - handler.handle_connection_checked_in_event(conn.checked_in_event()); - }); + self.event_emitter + .emit_event(|| conn.checked_in_event().into()); conn.mark_as_available(); @@ -495,13 +491,12 @@ impl ConnectionPoolWorker { self.generation_publisher.publish(self.generation.clone()); if was_ready { - self.emit_event(|handler| { - let event = PoolClearedEvent { + self.event_emitter.emit_event(|| { + PoolClearedEvent { address: self.address.clone(), service_id, - }; - - handler.handle_pool_cleared_event(event); + } + .into() }); if !matches!(self.generation, PoolGeneration::LoadBalanced(_)) { @@ -521,24 +516,14 @@ impl ConnectionPoolWorker { } self.state = PoolState::Ready; - self.emit_event(|handler| { - let event = PoolReadyEvent { + self.event_emitter.emit_event(|| { + PoolReadyEvent { address: self.address.clone(), - }; - - handler.handle_pool_ready_event(event); + } + .into() }); } - fn emit_event(&self, emit: F) - where - F: FnOnce(&Arc), - { - if let Some(ref handler) = self.event_handler { - emit(handler); - } - } - /// Close a connection, emit the event for it being closed, and decrement the /// total connection count. #[allow(clippy::single_match)] @@ -596,7 +581,7 @@ impl ConnectionPoolWorker { && self.pending_connection_count < MAX_CONNECTING { let pending_connection = self.create_pending_connection(); - let event_handler = self.event_handler.clone(); + let event_handler = self.event_emitter.clone(); let manager = self.manager.clone(); let establisher = self.establisher.clone(); let updater = self.server_updater.clone(); @@ -633,7 +618,7 @@ async fn establish_connection( server_updater: TopologyUpdater, manager: &PoolManager, credential: Option, - event_handler: Option>, + event_emitter: CmapEventEmitter, ) -> Result { let connection_id = pending_connection.id; let address = pending_connection.address.clone(); @@ -651,20 +636,20 @@ async fn establish_connection( e.handshake_phase.clone(), ) .await; - if let Some(handler) = event_handler { - let event = ConnectionClosedEvent { + event_emitter.emit_event(|| { + ConnectionClosedEvent { address, reason: ConnectionClosedReason::Error, connection_id, - }; - handler.handle_connection_closed_event(event); - } + #[cfg(feature = "tracing-unstable")] + error: Some(e.cause.clone()), + } + .into() + }); manager.handle_connection_failed(); } Ok(ref mut connection) => { - if let Some(handler) = event_handler { - handler.handle_connection_ready_event(connection.ready_event()) - }; + event_emitter.emit_event(|| connection.ready_event().into()); } } diff --git a/src/coll/mod.rs b/src/coll/mod.rs index ed9bfee0f..4637e0ce8 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -180,7 +180,7 @@ impl Collection { } /// Get the `Client` that this collection descended from. - fn client(&self) -> &Client { + pub(crate) fn client(&self) -> &Client { &self.inner.client } diff --git a/src/cursor/session.rs b/src/cursor/session.rs index 8e2651cca..79ec91648 100644 --- a/src/cursor/session.rs +++ b/src/cursor/session.rs @@ -352,6 +352,11 @@ impl SessionCursor { pub(crate) fn is_exhausted(&self) -> bool { self.state.as_ref().unwrap().exhausted } + + #[cfg(test)] + pub(crate) fn client(&self) -> &Client { + &self.client + } } impl Drop for SessionCursor { diff --git a/src/event/cmap.rs b/src/event/cmap.rs index c7479e5c1..312f42f36 100644 --- a/src/event/cmap.rs +++ b/src/event/cmap.rs @@ -1,11 +1,21 @@ //! Contains the events and functionality for monitoring behavior of the connection pooling of a //! `Client`. -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use serde::{Deserialize, Serialize}; use crate::{bson::oid::ObjectId, bson_util, options::ServerAddress}; +use derivative::Derivative; +use derive_more::From; + +#[cfg(feature = "tracing-unstable")] +use crate::trace::{ + connection::ConnectionTracingEventEmitter, + trace_or_log_enabled, + TracingOrLogLevel, + CONNECTION_TRACING_EVENT_TARGET, +}; /// We implement `Deserialize` for all of the event types so that we can more easily parse the CMAP /// spec tests. However, we have no need to parse the address field from the JSON files (if it's @@ -128,7 +138,8 @@ pub struct ConnectionReadyEvent { } /// Event emitted when a connection is closed. -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, Derivative, Serialize)] +#[derivative(PartialEq)] #[serde(rename_all = "camelCase")] #[non_exhaustive] pub struct ConnectionClosedEvent { @@ -144,6 +155,14 @@ pub struct ConnectionClosedEvent { /// The reason that the connection was closed. pub reason: ConnectionClosedReason, + + /// If the `reason` connection checkout failed was `Error`,the associated + /// error is contained here. This is attached so we can include it in log messages; + /// in future work we may add this to public API on the event itself. TODO: DRIVERS-2495 + #[cfg(feature = "tracing-unstable")] + #[serde(skip)] + #[derivative(PartialEq = "ignore")] + pub(crate) error: Option, } /// The reasons that a connection may be closed. @@ -178,7 +197,8 @@ pub struct ConnectionCheckoutStartedEvent { } /// Event emitted when a thread is unable to check out a connection. -#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, Derivative, Serialize)] +#[derivative(PartialEq)] #[non_exhaustive] pub struct ConnectionCheckoutFailedEvent { /// The address of the server that the connection would have connected to. @@ -188,6 +208,14 @@ pub struct ConnectionCheckoutFailedEvent { /// The reason a connection was unable to be checked out. pub reason: ConnectionCheckoutFailedReason, + + /// If the `reason` connection checkout failed was `ConnectionError`,the associated + /// error is contained here. This is attached so we can include it in log messages; + /// in future work we may add this to public API on the event itself. TODO: DRIVERS-2495 + #[cfg(feature = "tracing-unstable")] + #[serde(skip)] + #[derivative(PartialEq = "ignore")] + pub(crate) error: Option, } /// The reasons a connection may not be able to be checked out. @@ -327,3 +355,101 @@ pub trait CmapEventHandler: Send + Sync { /// whenever a connection is checked back into a connection pool. fn handle_connection_checked_in_event(&self, _event: ConnectionCheckedInEvent) {} } + +#[derive(Clone, Debug, PartialEq, From)] +pub(crate) enum CmapEvent { + PoolCreated(PoolCreatedEvent), + PoolReady(PoolReadyEvent), + PoolCleared(PoolClearedEvent), + PoolClosed(PoolClosedEvent), + ConnectionCreated(ConnectionCreatedEvent), + ConnectionReady(ConnectionReadyEvent), + ConnectionClosed(ConnectionClosedEvent), + ConnectionCheckoutStarted(ConnectionCheckoutStartedEvent), + ConnectionCheckoutFailed(ConnectionCheckoutFailedEvent), + ConnectionCheckedOut(ConnectionCheckedOutEvent), + ConnectionCheckedIn(ConnectionCheckedInEvent), +} + +#[derive(Clone)] +pub(crate) struct CmapEventEmitter { + user_handler: Option>, + + #[cfg(feature = "tracing-unstable")] + tracing_emitter: ConnectionTracingEventEmitter, +} + +impl CmapEventEmitter { + // the topology ID is only used when the tracing feature is on. + #[allow(unused_variables)] + pub(crate) fn new( + user_handler: Option>, + topology_id: ObjectId, + ) -> CmapEventEmitter { + Self { + user_handler, + #[cfg(feature = "tracing-unstable")] + tracing_emitter: ConnectionTracingEventEmitter::new(topology_id), + } + } + + #[cfg(not(feature = "tracing-unstable"))] + pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) { + if let Some(ref handler) = self.user_handler { + handle_cmap_event(handler.as_ref(), generate_event()); + } + } + + #[cfg(feature = "tracing-unstable")] + pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) { + // if the user isn't actually interested in debug-level connection messages, we shouldn't + // bother with the expense of generating and emitting these events. + let tracing_emitter_to_use = if trace_or_log_enabled!( + target: CONNECTION_TRACING_EVENT_TARGET, + TracingOrLogLevel::Debug + ) { + Some(&self.tracing_emitter) + } else { + None + }; + + match (&self.user_handler, tracing_emitter_to_use) { + (None, None) => {} + (None, Some(tracing_emitter)) => { + let event = generate_event(); + handle_cmap_event(tracing_emitter, event); + } + (Some(user_handler), None) => { + let event = generate_event(); + handle_cmap_event(user_handler.as_ref(), event); + } + (Some(user_handler), Some(tracing_emitter)) => { + let event = generate_event(); + handle_cmap_event(user_handler.as_ref(), event.clone()); + handle_cmap_event(tracing_emitter, event); + } + }; + } +} + +fn handle_cmap_event(handler: &dyn CmapEventHandler, event: CmapEvent) { + match event { + CmapEvent::PoolCreated(event) => handler.handle_pool_created_event(event), + CmapEvent::PoolReady(event) => handler.handle_pool_ready_event(event), + CmapEvent::PoolCleared(event) => handler.handle_pool_cleared_event(event), + CmapEvent::PoolClosed(event) => handler.handle_pool_closed_event(event), + CmapEvent::ConnectionCreated(event) => handler.handle_connection_created_event(event), + CmapEvent::ConnectionReady(event) => handler.handle_connection_ready_event(event), + CmapEvent::ConnectionClosed(event) => handler.handle_connection_closed_event(event), + CmapEvent::ConnectionCheckoutStarted(event) => { + handler.handle_connection_checkout_started_event(event) + } + CmapEvent::ConnectionCheckoutFailed(event) => { + handler.handle_connection_checkout_failed_event(event) + } + CmapEvent::ConnectionCheckedOut(event) => { + handler.handle_connection_checked_out_event(event) + } + CmapEvent::ConnectionCheckedIn(event) => handler.handle_connection_checked_in_event(event), + } +} diff --git a/src/gridfs.rs b/src/gridfs.rs index 662acef24..cf3e51990 100644 --- a/src/gridfs.rs +++ b/src/gridfs.rs @@ -196,6 +196,11 @@ impl GridFsBucket { } } + #[cfg(test)] + pub(crate) fn client(&self) -> &crate::Client { + self.inner.files.client() + } + /// Gets the read concern of the [`GridFsBucket`]. pub fn read_concern(&self) -> Option<&ReadConcern> { self.inner.options.read_concern.as_ref() diff --git a/src/lib.rs b/src/lib.rs index 72188eb65..fb1703a6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -343,10 +343,6 @@ pub mod sync; #[cfg(test)] mod test; -#[cfg(test)] -#[macro_use] -extern crate derive_more; - pub use crate::{ client::{session::ClientSession, Client}, coll::Collection, diff --git a/src/sdam/description/topology/server_selection/test/in_window.rs b/src/sdam/description/topology/server_selection/test/in_window.rs index b18a1c925..aefb50aab 100644 --- a/src/sdam/description/topology/server_selection/test/in_window.rs +++ b/src/sdam/description/topology/server_selection/test/in_window.rs @@ -9,6 +9,7 @@ use tokio::sync::RwLockWriteGuard; use crate::{ coll::options::FindOptions, error::Result, + event::cmap::CmapEvent, options::ServerAddress, runtime, runtime::AsyncJoinHandle, @@ -17,7 +18,6 @@ use crate::{ test::{ log_uncaptured, run_spec_test, - CmapEvent, Event, EventHandler, FailCommandOptions, diff --git a/src/sdam/server.rs b/src/sdam/server.rs index 7e7c416ba..ba6315a2c 100644 --- a/src/sdam/server.rs +++ b/src/sdam/server.rs @@ -37,12 +37,14 @@ impl Server { options: ClientOptions, connection_establisher: ConnectionEstablisher, topology_updater: TopologyUpdater, + topology_id: bson::oid::ObjectId, ) -> Arc { Arc::new(Self { pool: ConnectionPool::new( address.clone(), connection_establisher, topology_updater, + topology_id, Some(ConnectionPoolOptions::from_client_options(&options)), ), address, diff --git a/src/sdam/test.rs b/src/sdam/test.rs index c2770eae2..874648453 100644 --- a/src/sdam/test.rs +++ b/src/sdam/test.rs @@ -12,12 +12,11 @@ use crate::{ client::options::{ClientOptions, ServerAddress}, cmap::RawCommandResponse, error::{Error, ErrorKind}, - event::sdam::SdamEventHandler, + event::{cmap::CmapEvent, sdam::SdamEventHandler}, hello::{LEGACY_HELLO_COMMAND_NAME, LEGACY_HELLO_COMMAND_NAME_LOWERCASE}, sdam::{ServerDescription, Topology}, test::{ log_uncaptured, - CmapEvent, Event, EventClient, EventHandler, diff --git a/src/sdam/topology.rs b/src/sdam/topology.rs index 259b50c3f..c7b5d6214 100644 --- a/src/sdam/topology.rs +++ b/src/sdam/topology.rs @@ -58,7 +58,7 @@ use super::{ /// When this is dropped, monitors will stop performing checks. #[derive(Debug)] pub(crate) struct Topology { - #[cfg(feature = "tracing-unstable")] + #[cfg(any(feature = "tracing-unstable", test))] pub(crate) id: ObjectId, watcher: TopologyWatcher, updater: TopologyUpdater, @@ -117,7 +117,7 @@ impl Topology { worker.start(); Ok(Topology { - #[cfg(feature = "tracing-unstable")] + #[cfg(any(feature = "tracing-unstable", test))] id, watcher, updater, @@ -545,6 +545,7 @@ impl TopologyWorker { self.options.clone(), self.connection_establisher.clone(), self.topology_updater.clone(), + self.id, ); self.servers.insert( diff --git a/src/test/client.rs b/src/test/client.rs index 8e5980d9c..407167f1b 100644 --- a/src/test/client.rs +++ b/src/test/client.rs @@ -7,6 +7,7 @@ use tokio::sync::{RwLockReadGuard, RwLockWriteGuard}; use crate::{ bson::{doc, Bson}, error::{CommandError, Error, ErrorKind}, + event::cmap::CmapEvent, hello::LEGACY_HELLO_COMMAND_NAME, options::{AuthMechanism, ClientOptions, Credential, ListDatabasesOptions, ServerAddress}, runtime, @@ -14,7 +15,6 @@ use crate::{ test::{ log_uncaptured, util::TestClient, - CmapEvent, Event, EventHandler, FailCommandOptions, @@ -832,7 +832,7 @@ async fn retry_commit_txn_check_out() { // ensure the first check out attempt fails subscriber .wait_for_event(Duration::from_secs(1), |e| { - matches!(e, Event::Cmap(CmapEvent::ConnectionCheckOutFailed(_))) + matches!(e, Event::Cmap(CmapEvent::ConnectionCheckoutFailed(_))) }) .await .expect("should see check out failed event"); diff --git a/src/test/mod.rs b/src/test/mod.rs index d151b9005..ef2520b5a 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -13,7 +13,7 @@ mod index_management; #[cfg(all(not(feature = "sync"), not(feature = "tokio-sync")))] mod lambda_examples; pub mod spec; -mod util; +pub(crate) mod util; pub(crate) use self::{ spec::{run_single_test, run_spec_test, run_spec_test_with_path, RunOn, Serverless, Topology}, @@ -21,7 +21,6 @@ pub(crate) use self::{ assert_matches, eq_matches, log_uncaptured, - CmapEvent, Event, EventClient, EventHandler, diff --git a/src/test/spec/json/connection-monitoring-and-pooling/README.rst b/src/test/spec/json/connection-monitoring-and-pooling/README.rst index 457725542..ae4af543f 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/README.rst +++ b/src/test/spec/json/connection-monitoring-and-pooling/README.rst @@ -11,214 +11,16 @@ Connection Monitoring and Pooling (CMAP) Introduction ============ +Drivers MUST implement all of the following types of CMAP tests: -The YAML and JSON files in this directory are platform-independent tests that -drivers can use to prove their conformance to the Connection Monitoring and Pooling (CMAP) Spec. - -Several prose tests, which are not easily expressed in YAML, are also presented -in this file. Those tests will need to be manually implemented by each driver. - -Common Test Format -================== - -Each YAML file has the following keys: - -- ``version``: A version number indicating the expected format of the spec tests (current version = 1) -- ``style``: A string indicating what style of tests this file contains. Contains one of the following: - - - ``"unit"``: a test that may be run without connecting to a MongoDB deployment. - - ``"integration"``: a test that MUST be run against a real MongoDB deployment. - -- ``description``: A text description of what the test is meant to assert - -Unit Test Format: -================= - -All Unit Tests have some of the following fields: - -- ``poolOptions``: If present, connection pool options to use when creating a pool; - both `standard ConnectionPoolOptions `__ - and the following test-specific options are allowed: - - - ``backgroundThreadIntervalMS``: A time interval between the end of a - `Background Thread Run `__ - and the beginning of the next Run. If a Connection Pool does not implement a Background Thread, the Test Runner MUST ignore the option. - If the option is not specified, an implementation is free to use any value it finds reasonable. - - Possible values (0 is not allowed): - - - A negative value: never begin a Run. - - A positive value: the interval between Runs in milliseconds. - -- ``operations``: A list of operations to perform. All operations support the following fields: - - - ``name``: A string describing which operation to issue. - - ``thread``: The name of the thread in which to run this operation. If not specified, runs in the default thread - -- ``error``: Indicates that the main thread is expected to error during this test. An error may include of the following fields: - - - ``type``: the type of error emitted - - ``message``: the message associated with that error - - ``address``: Address of pool emitting error - -- ``events``: An array of all connection monitoring events expected to occur while running ``operations``. An event may contain any of the following fields - - - ``type``: The type of event emitted - - ``address``: The address of the pool emitting the event - - ``connectionId``: The id of a connection associated with the event - - ``options``: Options used to create the pool - - ``reason``: A reason giving mroe information on why the event was emitted - -- ``ignore``: An array of event names to ignore - -Valid Unit Test Operations are the following: - -- ``start(target)``: Starts a new thread named ``target`` - - - ``target``: The name of the new thread to start - -- ``wait(ms)``: Sleep the current thread for ``ms`` milliseconds - - - ``ms``: The number of milliseconds to sleep the current thread for - -- ``waitForThread(target)``: wait for thread ``target`` to finish executing. Propagate any errors to the main thread. - - - ``target``: The name of the thread to wait for. - -- ``waitForEvent(event, count, timeout)``: block the current thread until ``event`` has occurred ``count`` times - - - ``event``: The name of the event - - ``count``: The number of times the event must occur (counting from the start of the test) - - ``timeout``: If specified, time out with an error after waiting for this many milliseconds without seeing the required events - -- ``label = pool.checkOut()``: call ``checkOut`` on pool, returning the checked out connection - - - ``label``: If specified, associate this label with the returned connection, so that it may be referenced in later operations - -- ``pool.checkIn(connection)``: call ``checkIn`` on pool - - - ``connection``: A string label identifying which connection to check in. Should be a label that was previously set with ``checkOut`` - -- ``pool.clear()``: call ``clear`` on Pool -- ``pool.close()``: call ``close`` on Pool -- ``pool.ready()``: call ``ready`` on Pool - - -Integration Test Format -======================= - -The integration test format is identical to the unit test format with -the addition of the following fields to each test: - -- ``runOn`` (optional): An array of server version and/or topology requirements - for which the tests can be run. If the test environment satisfies one or more - of these requirements, the tests may be executed; otherwise, this test should - be skipped. If this field is omitted, the tests can be assumed to have no - particular requirements and should be executed. Each element will have some or - all of the following fields: - - - ``minServerVersion`` (optional): The minimum server version (inclusive) - required to successfully run the tests. If this field is omitted, it should - be assumed that there is no lower bound on the required server version. - - - ``maxServerVersion`` (optional): The maximum server version (inclusive) - against which the tests can be run successfully. If this field is omitted, - it should be assumed that there is no upper bound on the required server - version. - -- ``failPoint``: optional, a document containing a ``configureFailPoint`` - command to run against the endpoint being used for the test. - -- ``poolOptions.appName`` (optional): appName attribute to be set in connections, which will be affected by the fail point. - -Spec Test Match Function -======================== - -The definition of MATCH or MATCHES in the Spec Test Runner is as follows: - -- MATCH takes two values, ``expected`` and ``actual`` -- Notation is "Assert [actual] MATCHES [expected] -- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value" - -Pseudocode implementation of ``actual`` MATCHES ``expected``: - -:: - - If expected is "42" or 42: - Assert that actual exists (is not null or undefined) - Else: - Assert that actual is of the same JSON type as expected - If expected is a JSON array: - For every idx/value in expected: - Assert that actual[idx] MATCHES value - Else if expected is a JSON object: - For every key/value in expected - Assert that actual[key] MATCHES value - Else: - Assert that expected equals actual - -Unit Test Runner: -================= - -For the unit tests, the behavior of a Connection is irrelevant beyond the need to asserting ``connection.id``. Drivers MAY use a mock connection class for testing the pool behavior in unit tests - -For each YAML file with ``style: unit``: - -- Create a Pool ``pool``, subscribe and capture any Connection Monitoring events emitted in order. - - - If ``poolOptions`` is specified, use those options to initialize both pools - - The returned pool must have an ``address`` set as a string value. - -- Process each ``operation`` in ``operations`` (on the main thread) - - - If a ``thread`` is specified, the main thread MUST schedule the operation to execute in the corresponding thread. Otherwise, execute the operation directly in the main thread. - -- If ``error`` is presented - - - Assert that an actual error ``actualError`` was thrown by the main thread - - Assert that ``actualError`` MATCHES ``error`` - -- Else: - - - Assert that no errors were thrown by the main thread - -- calculate ``actualEvents`` as every Connection Event emitted whose ``type`` is not in ``ignore`` -- if ``events`` is not empty, then for every ``idx``/``expectedEvent`` in ``events`` - - - Assert that ``actualEvents[idx]`` exists - - Assert that ``actualEvents[idx]`` MATCHES ``expectedEvent`` - - -It is important to note that the ``ignore`` list is used for calculating ``actualEvents``, but is NOT used for the ``waitForEvent`` command - -Integration Test Runner -======================= - -The steps to run the integration tests are the same as those used to run the -unit tests with the following modifications: - -- The integration tests MUST be run against an actual endpoint. If the - deployment being tested contains multiple endpoints, then the runner MUST - only use one of them to run the tests against. - -- For each test, if `failPoint` is specified, its value is a - ``configureFailPoint`` command. Run the command on the admin database of the - endpoint being tested to enable the fail point. - -- At the end of each test, any enabled fail point MUST be disabled to avoid - spurious failures in subsequent tests. The fail point may be disabled like - so:: - - db.adminCommand({ - configureFailPoint: , - mode: "off" - }); - +* Pool unit and integration tests as described in `cmap-format/README.rst <./cmap-format/README.rst>`__ +* Pool prose tests as described below in `Prose Tests`_ +* Logging tests as described below in `Logging Tests`_ Prose Tests =========== -The following tests have not yet been automated, but MUST still be tested +The following tests have not yet been automated, but MUST still be tested: #. All ConnectionPoolOptions MUST be specified at the MongoClient level #. All ConnectionPoolOptions MUST be the same for all pools created by a MongoClient @@ -226,3 +28,9 @@ The following tests have not yet been automated, but MUST still be tested #. A user MUST be able to subscribe to Connection Monitoring Events in a manner idiomatic to their language and driver #. When a check out attempt fails because connection set up throws an error, assert that a ConnectionCheckOutFailedEvent with reason="connectionError" is emitted. + +Logging Tests +============= + +Tests for connection pool logging can be found in the `/logging <./logging>`__ subdirectory and are written in the +`Unified Test Format <../../unified-test-format/unified-test-format.rst>`__. \ No newline at end of file diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst new file mode 100644 index 000000000..5bb72dd0f --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst @@ -0,0 +1,215 @@ +.. role:: javascript(code) + :language: javascript + +=================================================================== +Connection Monitoring and Pooling (CMAP) Unit and Integration Tests +=================================================================== + +.. contents:: + +-------- + +Introduction +============ + +The YAML and JSON files in this directory are platform-independent tests that +drivers can use to prove their conformance to the Connection Monitoring and Pooling (CMAP) Spec. + +Common Test Format +================== + +Each YAML file has the following keys: + +- ``version``: A version number indicating the expected format of the spec tests (current version = 1) +- ``style``: A string indicating what style of tests this file contains. Contains one of the following: + + - ``"unit"``: a test that may be run without connecting to a MongoDB deployment. + - ``"integration"``: a test that MUST be run against a real MongoDB deployment. + +- ``description``: A text description of what the test is meant to assert + +Unit Test Format: +================= + +All Unit Tests have some of the following fields: + +- ``poolOptions``: If present, connection pool options to use when creating a pool; + both `standard ConnectionPoolOptions `__ + and the following test-specific options are allowed: + + - ``backgroundThreadIntervalMS``: A time interval between the end of a + `Background Thread Run `__ + and the beginning of the next Run. If a Connection Pool does not implement a Background Thread, the Test Runner MUST ignore the option. + If the option is not specified, an implementation is free to use any value it finds reasonable. + + Possible values (0 is not allowed): + + - A negative value: never begin a Run. + - A positive value: the interval between Runs in milliseconds. + +- ``operations``: A list of operations to perform. All operations support the following fields: + + - ``name``: A string describing which operation to issue. + - ``thread``: The name of the thread in which to run this operation. If not specified, runs in the default thread + +- ``error``: Indicates that the main thread is expected to error during this test. An error may include of the following fields: + + - ``type``: the type of error emitted + - ``message``: the message associated with that error + - ``address``: Address of pool emitting error + +- ``events``: An array of all connection monitoring events expected to occur while running ``operations``. An event may contain any of the following fields + + - ``type``: The type of event emitted + - ``address``: The address of the pool emitting the event + - ``connectionId``: The id of a connection associated with the event + - ``options``: Options used to create the pool + - ``reason``: A reason giving mroe information on why the event was emitted + +- ``ignore``: An array of event names to ignore + +Valid Unit Test Operations are the following: + +- ``start(target)``: Starts a new thread named ``target`` + + - ``target``: The name of the new thread to start + +- ``wait(ms)``: Sleep the current thread for ``ms`` milliseconds + + - ``ms``: The number of milliseconds to sleep the current thread for + +- ``waitForThread(target)``: wait for thread ``target`` to finish executing. Propagate any errors to the main thread. + + - ``target``: The name of the thread to wait for. + +- ``waitForEvent(event, count, timeout)``: block the current thread until ``event`` has occurred ``count`` times + + - ``event``: The name of the event + - ``count``: The number of times the event must occur (counting from the start of the test) + - ``timeout``: If specified, time out with an error after waiting for this many milliseconds without seeing the required events + +- ``label = pool.checkOut()``: call ``checkOut`` on pool, returning the checked out connection + + - ``label``: If specified, associate this label with the returned connection, so that it may be referenced in later operations + +- ``pool.checkIn(connection)``: call ``checkIn`` on pool + + - ``connection``: A string label identifying which connection to check in. Should be a label that was previously set with ``checkOut`` + +- ``pool.clear()``: call ``clear`` on Pool + + - ``interruptInUseConnections``: Determines whether "in use" connections should be also interrupted + +- ``pool.close()``: call ``close`` on Pool +- ``pool.ready()``: call ``ready`` on Pool + + +Integration Test Format +======================= + +The integration test format is identical to the unit test format with +the addition of the following fields to each test: + +- ``runOn`` (optional): An array of server version and/or topology requirements + for which the tests can be run. If the test environment satisfies one or more + of these requirements, the tests may be executed; otherwise, this test should + be skipped. If this field is omitted, the tests can be assumed to have no + particular requirements and should be executed. Each element will have some or + all of the following fields: + + - ``minServerVersion`` (optional): The minimum server version (inclusive) + required to successfully run the tests. If this field is omitted, it should + be assumed that there is no lower bound on the required server version. + + - ``maxServerVersion`` (optional): The maximum server version (inclusive) + against which the tests can be run successfully. If this field is omitted, + it should be assumed that there is no upper bound on the required server + version. + +- ``failPoint``: optional, a document containing a ``configureFailPoint`` + command to run against the endpoint being used for the test. + +- ``poolOptions.appName`` (optional): appName attribute to be set in connections, which will be affected by the fail point. + +Spec Test Match Function +======================== + +The definition of MATCH or MATCHES in the Spec Test Runner is as follows: + +- MATCH takes two values, ``expected`` and ``actual`` +- Notation is "Assert [actual] MATCHES [expected] +- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value" + +Pseudocode implementation of ``actual`` MATCHES ``expected``: + +:: + + If expected is "42" or 42: + Assert that actual exists (is not null or undefined) + Else: + Assert that actual is of the same JSON type as expected + If expected is a JSON array: + For every idx/value in expected: + Assert that actual[idx] MATCHES value + Else if expected is a JSON object: + For every key/value in expected + Assert that actual[key] MATCHES value + Else: + Assert that expected equals actual + +Unit Test Runner: +================= + +For the unit tests, the behavior of a Connection is irrelevant beyond the need to asserting ``connection.id``. Drivers MAY use a mock connection class for testing the pool behavior in unit tests + +For each YAML file with ``style: unit``: + +- Create a Pool ``pool``, subscribe and capture any Connection Monitoring events emitted in order. + + - If ``poolOptions`` is specified, use those options to initialize both pools + - The returned pool must have an ``address`` set as a string value. + +- Process each ``operation`` in ``operations`` (on the main thread) + + - If a ``thread`` is specified, the main thread MUST schedule the operation to execute in the corresponding thread. Otherwise, execute the operation directly in the main thread. + +- If ``error`` is presented + + - Assert that an actual error ``actualError`` was thrown by the main thread + - Assert that ``actualError`` MATCHES ``error`` + +- Else: + + - Assert that no errors were thrown by the main thread + +- calculate ``actualEvents`` as every Connection Event emitted whose ``type`` is not in ``ignore`` +- if ``events`` is not empty, then for every ``idx``/``expectedEvent`` in ``events`` + + - Assert that ``actualEvents[idx]`` exists + - Assert that ``actualEvents[idx]`` MATCHES ``expectedEvent`` + + +It is important to note that the ``ignore`` list is used for calculating ``actualEvents``, but is NOT used for the ``waitForEvent`` command + +Integration Test Runner +======================= + +The steps to run the integration tests are the same as those used to run the +unit tests with the following modifications: + +- The integration tests MUST be run against an actual endpoint. If the + deployment being tested contains multiple endpoints, then the runner MUST + only use one of them to run the tests against. + +- For each test, if `failPoint` is specified, its value is a + ``configureFailPoint`` command. Run the command on the admin database of the + endpoint being tested to enable the fail point. + +- At the end of each test, any enabled fail point MUST be disabled to avoid + spurious failures in subsequent tests. The fail point may be disabled like + so:: + + db.adminCommand({ + configureFailPoint: , + mode: "off" + }); diff --git a/src/test/spec/json/connection-monitoring-and-pooling/connection-must-have-id.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-have-id.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/connection-must-have-id.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-have-id.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/connection-must-have-id.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-have-id.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/connection-must-have-id.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-have-id.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/connection-must-order-ids.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-order-ids.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/connection-must-order-ids.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-order-ids.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/connection-must-order-ids.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-order-ids.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/connection-must-order-ids.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/connection-must-order-ids.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-closed.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-closed.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-closed.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-closed.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-closed.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-closed.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-closed.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-closed.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-stale.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-stale.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-stale.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-stale.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-stale.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-stale.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-destroy-stale.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-destroy-stale.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-make-available.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-make-available.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-make-available.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin-make-available.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkin.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkin.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-connection.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-connection.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-connection.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-connection.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-error-closed.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-error-closed.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-error-closed.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-error-closed.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-is-enforced.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-is-enforced.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-is-enforced.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-is-enforced.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-timeout.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-timeout.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-timeout.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-maxConnecting-timeout.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-multiple.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-multiple.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-multiple.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-multiple.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-multiple.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-multiple.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-multiple.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-multiple.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-idle.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-idle.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-idle.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-idle.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-idle.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-idle.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-idle.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-idle.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-stale.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-stale.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-stale.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-stale.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-stale.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-stale.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-no-stale.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-no-stale.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-returned-connection-maxConnecting.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-returned-connection-maxConnecting.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-returned-connection-maxConnecting.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-checkout-returned-connection-maxConnecting.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-clears-waitqueue.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-clears-waitqueue.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-clears-waitqueue.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-clears-waitqueue.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-min-size.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-min-size.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-min-size.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-min-size.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-min-size.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-min-size.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-min-size.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-min-size.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-paused.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-paused.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-paused.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-paused.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-paused.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-paused.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-paused.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-paused.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-ready.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-ready.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear-ready.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear-ready.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-clear.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-clear.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-close-destroy-conns.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close-destroy-conns.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-close-destroy-conns.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close-destroy-conns.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-close-destroy-conns.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close-destroy-conns.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-close-destroy-conns.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close-destroy-conns.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-close.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-close.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-close.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-close.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-close.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-max-size.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-max-size.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-max-size.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-max-size.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-max-size.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-max-size.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-max-size.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-max-size.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size-error.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size-error.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size-error.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size-error.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-min-size.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-with-options.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-with-options.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-with-options.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-with-options.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create-with-options.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-with-options.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create-with-options.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-with-options.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-create.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-create.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-ready-ready.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready-ready.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-ready-ready.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready-ready.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-ready-ready.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready-ready.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-ready-ready.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready-ready.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-ready.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-ready.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/pool-ready.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/pool-ready.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/wait-queue-fairness.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-fairness.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/wait-queue-fairness.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-fairness.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/wait-queue-fairness.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-fairness.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/wait-queue-fairness.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-fairness.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/wait-queue-timeout.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.json similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/wait-queue-timeout.json rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.json diff --git a/src/test/spec/json/connection-monitoring-and-pooling/wait-queue-timeout.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.yml similarity index 100% rename from src/test/spec/json/connection-monitoring-and-pooling/wait-queue-timeout.yml rename to src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.yml diff --git a/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.json b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.json new file mode 100644 index 000000000..e21a3d049 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.json @@ -0,0 +1,435 @@ +{ + "description": "connection-logging", + "schemaVersion": "1.13", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "Create a client, run a command, and close the client", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeLogMessages": { + "connection": "debug" + } + } + } + ] + } + }, + { + "name": "listDatabases", + "object": "client", + "arguments": { + "filter": {} + } + }, + { + "name": "close", + "object": "client" + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection ready", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked out", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checked in", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection closed", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "Connection pool was closed" + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool closed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Connection checkout fails due to error establishing connection", + "runOnRequirements": [ + { + "auth": true, + "minServerVersion": "4.0" + } + ], + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryReads": false, + "appname": "clientAppName", + "heartbeatFrequencyMS": 10000 + }, + "observeLogMessages": { + "connection": "debug" + } + } + } + ] + } + }, + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "saslContinue" + ], + "closeConnection": true, + "appName": "clientAppName" + } + } + } + }, + { + "name": "listDatabases", + "object": "client", + "arguments": { + "filter": {} + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout started", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool cleared", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection closed", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "An error occurred while using the connection", + "error": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection checkout failed", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "reason": "An error occurred while trying to establish a new connection", + "error": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.yml b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.yml new file mode 100644 index 000000000..58ac7ec34 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-logging.yml @@ -0,0 +1,196 @@ +description: "connection-logging" + +schemaVersion: "1.13" + +runOnRequirements: + - topologies: + - single # The number of log messages is different for each topology since there is a connection pool per host. + +createEntities: + - client: + id: &failPointClient failPointClient + +tests: + - description: "Create a client, run a command, and close the client" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + observeLogMessages: + connection: debug + - name: listDatabases + object: *client + arguments: + filter: {} + - name: close + object: *client + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection ready" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked out" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checked in" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection closed" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + reason: "Connection pool was closed" + + - level: debug + component: connection + data: + message: "Connection pool closed" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # This test exists to provide coverage of checkout failed and pool cleared events. + - description: "Connection checkout fails due to error establishing connection" + runOnRequirements: + - auth: true + minServerVersion: "4.0" # failCommand was added to mongod in 4.0 + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + uriOptions: + retryReads: false + appname: &clientAppName clientAppName + # use a high heartbeatFrequencyMS to avoid a successful monitor check marking the pool as + # ready (and emitting another event) during the course of test execution. + heartbeatFrequencyMS: 10000 + observeLogMessages: + connection: debug + - name: failPoint + object: testRunner + arguments: + client: *failPointClient + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["saslContinue"] + closeConnection: true + appName: *clientAppName + - name: listDatabases + object: *client + arguments: + filter: {} + expectError: + isClientError: true + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection checkout started" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection pool cleared" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection closed" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + reason: "An error occurred while using the connection" + error: { $$exists: true } + + - level: debug + component: connection + data: + message: "Connection checkout failed" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + reason: "An error occurred while trying to establish a new connection" + error: { $$exists: true } diff --git a/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-pool-options.json b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-pool-options.json new file mode 100644 index 000000000..e67804915 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-pool-options.json @@ -0,0 +1,451 @@ +{ + "description": "connection-logging", + "schemaVersion": "1.13", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "tests": [ + { + "description": "Options should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "connectionReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "minPoolSize": 1, + "maxPoolSize": 5, + "maxIdleTimeMS": 10000 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "connectionReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "minPoolSize": 1, + "maxPoolSize": 5, + "maxIdleTimeMS": 10000 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection created", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection ready", + "driverConnectionId": { + "$$type": [ + "int", + "long" + ] + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "maxConnecting should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "maxConnecting": 5 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "maxConnecting": 5 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "waitQueueTimeoutMS should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "waitQueueTimeoutMS": 10000 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "waitQueueTimeoutMS": 10000 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "waitQueueSize should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "waitQueueSize": 100 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "waitQueueSize": 100 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "waitQueueMultiple should be included in connection pool created message when specified", + "operations": [ + { + "name": "createEntities", + "object": "testRunner", + "arguments": { + "entities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "poolReadyEvent" + ], + "observeLogMessages": { + "connection": "debug" + }, + "uriOptions": { + "waitQueueSize": 5 + } + } + } + ] + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "poolReadyEvent": {} + }, + "count": 1 + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool created", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + }, + "waitQueueMultiple": 5 + } + }, + { + "level": "debug", + "component": "connection", + "data": { + "message": "Connection pool ready", + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + } + ] +} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-pool-options.yml b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-pool-options.yml new file mode 100644 index 000000000..b22693a92 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/logging/connection-pool-options.yml @@ -0,0 +1,253 @@ +description: "connection-logging" + +schemaVersion: "1.13" + +runOnRequirements: + - topologies: + - single # The number of log messages is different for each topology since there is a connection pool per host. + +tests: + - description: "Options should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait on a connection ready event for the connection created in the background. + # This is to avoid raciness around whether the background thread has created the connection + # (and whether corresponding log messages have been generated) by the time log message assertions + # are made. + observeEvents: + - connectionReadyEvent + observeLogMessages: + connection: debug + uriOptions: + minPoolSize: 1 + maxPoolSize: 5 + maxIdleTimeMS: 10000 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + connectionReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + minPoolSize: 1 + maxPoolSize: 5 + maxIdleTimeMS: 10000 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection created" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + - level: debug + component: connection + data: + message: "Connection ready" + driverConnectionId: { $$type: [int, long] } + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers who have not done DRIVERS-1943 will need to skip this test. + - description: "maxConnecting should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + maxConnecting: 5 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + maxConnecting: 5 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers that do not support waitQueueTimeoutMS will need to skip this test. + - description: "waitQueueTimeoutMS should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + waitQueueTimeoutMS: 10000 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + waitQueueTimeoutMS: 10000 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers that do not support waitQueueSize will need to skip this test. + - description: "waitQueueSize should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + waitQueueSize: 100 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + waitQueueSize: 100 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + + # Drivers that do not support waitQueueMultiple will need to skip this test. + - description: "waitQueueMultiple should be included in connection pool created message when specified" + operations: + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + # Observe and wait for a poolReadyEvent so we can ensure the pool has been created and is + # ready by the time we assert on log messages, in order to avoid raciness around which messages + # are emitted. + observeEvents: + - poolReadyEvent + observeLogMessages: + connection: debug + uriOptions: + waitQueueSize: 5 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + expectLogMessages: + - client: *client + messages: + - level: debug + component: connection + data: + message: "Connection pool created" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } + waitQueueMultiple: 5 + + - level: debug + component: connection + data: + message: "Connection pool ready" + serverHost: { $$type: string } + serverPort: { $$type: [int, long] } diff --git a/src/test/spec/retryable_reads.rs b/src/test/spec/retryable_reads.rs index c5e9b9ec9..73a946177 100644 --- a/src/test/spec/retryable_reads.rs +++ b/src/test/spec/retryable_reads.rs @@ -6,7 +6,7 @@ use tokio::sync::RwLockWriteGuard; use crate::{ error::Result, event::{ - cmap::{CmapEventHandler, ConnectionCheckoutFailedReason}, + cmap::{CmapEvent, CmapEventHandler, ConnectionCheckoutFailedReason}, command::CommandEventHandler, }, runtime, @@ -14,7 +14,6 @@ use crate::{ test::{ log_uncaptured, run_spec_test, - CmapEvent, Event, EventHandler, FailCommandOptions, @@ -150,7 +149,7 @@ async fn retry_read_pool_cleared() { let _ = subscriber .wait_for_event(Duration::from_millis(500), |event| match event { - Event::Cmap(CmapEvent::ConnectionCheckOutFailed(e)) => { + Event::Cmap(CmapEvent::ConnectionCheckoutFailed(e)) => { matches!(e.reason, ConnectionCheckoutFailedReason::ConnectionError) } _ => false, diff --git a/src/test/spec/retryable_writes/mod.rs b/src/test/spec/retryable_writes/mod.rs index 1c04a482f..99df92b76 100644 --- a/src/test/spec/retryable_writes/mod.rs +++ b/src/test/spec/retryable_writes/mod.rs @@ -12,7 +12,7 @@ use crate::{ bson::{doc, Document}, error::{ErrorKind, Result, RETRYABLE_WRITE_ERROR}, event::{ - cmap::{CmapEventHandler, ConnectionCheckoutFailedReason}, + cmap::{CmapEvent, CmapEventHandler, ConnectionCheckoutFailedReason}, command::CommandEventHandler, }, options::{ClientOptions, FindOptions, InsertManyOptions}, @@ -24,7 +24,6 @@ use crate::{ log_uncaptured, run_spec_test, util::get_default_name, - CmapEvent, Event, EventClient, EventHandler, @@ -481,7 +480,7 @@ async fn retry_write_pool_cleared() { let _ = subscriber .wait_for_event(Duration::from_millis(500), |event| match event { - Event::Cmap(CmapEvent::ConnectionCheckOutFailed(e)) => { + Event::Cmap(CmapEvent::ConnectionCheckoutFailed(e)) => { matches!(e.reason, ConnectionCheckoutFailedReason::ConnectionError) } _ => false, diff --git a/src/test/spec/trace.rs b/src/test/spec/trace.rs index 8e5aaf611..8788c5442 100644 --- a/src/test/spec/trace.rs +++ b/src/test/spec/trace.rs @@ -28,6 +28,8 @@ use crate::{ }; use std::{collections::HashMap, iter, time::Duration}; +use super::{run_unified_format_test_filtered, unified_runner::TestCase}; + #[test] fn tracing_truncation() { let two_emoji = String::from("🤔🤔"); @@ -362,3 +364,23 @@ async fn command_logging_unified() { ) .await; } + +#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))] +#[cfg_attr(feature = "async-std-runtime", async_std::test)] +async fn connection_logging_unified() { + let test_predicate = |tc: &TestCase| + // TODO: RUST-1096 Unskip when configurable maxConnecting is added. + tc.description != "maxConnecting should be included in connection pool created message when specified" && + // We don't support any of these options (and are unlikely to ever support them). + tc.description != "waitQueueTimeoutMS should be included in connection pool created message when specified" && + tc.description != "waitQueueSize should be included in connection pool created message when specified" && + tc.description != "waitQueueMultiple should be included in connection pool created message when specified"; + + let _guard = LOCK.run_exclusively().await; + + run_spec_test_with_path( + &["connection-monitoring-and-pooling", "logging"], + |path, file| run_unified_format_test_filtered(path, file, test_predicate), + ) + .await; +} diff --git a/src/test/spec/unified_runner/entity.rs b/src/test/spec/unified_runner/entity.rs index 91b53ccb1..7e4f3481b 100644 --- a/src/test/spec/unified_runner/entity.rs +++ b/src/test/spec/unified_runner/entity.rs @@ -47,10 +47,11 @@ pub(crate) enum Entity { TopologyDescription(TopologyDescription), None, } - #[derive(Clone, Debug)] pub(crate) struct ClientEntity { - client: Client, + /// This is None if a `close` operation has been executed for this entity. + pub(crate) client: Option, + pub(crate) topology_id: bson::oid::ObjectId, handler: Arc, pub(crate) observer: Arc>, observe_events: Option>, @@ -124,8 +125,10 @@ impl ClientEntity { observe_sensitive_commands: bool, ) -> Self { let observer = EventObserver::new(handler.broadcaster().subscribe()); + let topology_id = client.topology().id; Self { - client, + client: Some(client), + topology_id, handler, observer: Arc::new(Mutex::new(observer)), observe_events, @@ -200,7 +203,9 @@ impl ClientEntity { /// Synchronize all connection pool worker threads. pub(crate) async fn sync_workers(&self) { - self.client.sync_workers().await; + if let Some(client) = &self.client { + client.sync_workers().await; + } } } @@ -267,7 +272,13 @@ impl Deref for ClientEntity { type Target = Client; fn deref(&self) -> &Self::Target { - &self.client + match &self.client { + Some(c) => c, + None => panic!( + "Attempted to deference a client entity which was closed via a `close` test \ + operation" + ), + } } } @@ -306,6 +317,13 @@ impl Entity { } } + pub(crate) fn as_mut_client(&mut self) -> &mut ClientEntity { + match self { + Self::Client(client) => client, + _ => panic!("Expected client, got {:?}", &self), + } + } + pub(crate) fn as_database(&self) -> &Database { match self { Self::Database(database) => database, @@ -382,4 +400,22 @@ impl Entity { _ => panic!("Expected event list, got {:?}", &self), } } + + /// If this entity is descended from a client entity, returns the topology ID for that client. + pub(crate) async fn client_topology_id(&self) -> Option { + match self { + Entity::Client(client_entity) => Some(client_entity.topology_id), + Entity::Database(database) => Some(database.client().topology().id), + Entity::Collection(collection) => Some(collection.client().topology().id), + Entity::Session(session) => Some(session.client().topology().id), + Entity::Bucket(bucket) => Some(bucket.client().topology().id), + Entity::Cursor(cursor) => match cursor { + TestCursor::Normal(cursor) => Some(cursor.lock().await.client().topology().id), + TestCursor::Session { cursor, .. } => Some(cursor.client().topology().id), + TestCursor::ChangeStream(cs) => Some(cs.lock().await.client().topology().id), + TestCursor::Closed => None, + }, + _ => None, + } + } } diff --git a/src/test/spec/unified_runner/matcher.rs b/src/test/spec/unified_runner/matcher.rs index fcc331c49..365add131 100644 --- a/src/test/spec/unified_runner/matcher.rs +++ b/src/test/spec/unified_runner/matcher.rs @@ -3,8 +3,8 @@ use bson::Document; use crate::{ bson::{doc, spec::ElementType, Bson}, bson_util::get_int, - event::{command::CommandEvent, sdam::ServerDescription}, - test::{CmapEvent, Event, SdamEvent}, + event::{cmap::CmapEvent, command::CommandEvent, sdam::ServerDescription}, + test::{Event, SdamEvent}, }; use super::{ @@ -290,11 +290,11 @@ fn cmap_events_match(actual: &CmapEvent, expected: &ExpectedCmapEvent) -> Result }, ) => match_opt(&actual.reason, expected_reason), ( - CmapEvent::ConnectionCheckOutStarted(_), + CmapEvent::ConnectionCheckoutStarted(_), ExpectedCmapEvent::ConnectionCheckOutStarted {}, ) => Ok(()), ( - CmapEvent::ConnectionCheckOutFailed(actual), + CmapEvent::ConnectionCheckoutFailed(actual), ExpectedCmapEvent::ConnectionCheckOutFailed { reason: expected_reason, }, diff --git a/src/test/spec/unified_runner/operation.rs b/src/test/spec/unified_runner/operation.rs index 31003aef1..dd098331a 100644 --- a/src/test/spec/unified_runner/operation.rs +++ b/src/test/spec/unified_runner/operation.rs @@ -2120,12 +2120,48 @@ impl TestOperation for Close { ) -> BoxFuture<'a, Result>> { async move { let mut entities = test_runner.entities.write().await; - let cursor = entities.get_mut(id).unwrap().as_mut_cursor(); - let rx = cursor.make_kill_watcher().await; - *cursor = TestCursor::Closed; - drop(entities); - let _ = rx.await; - Ok(None) + let target_entity = entities.get(id).unwrap(); + match target_entity { + Entity::Client(_) => { + let mut client = entities.get_mut(id).unwrap().as_mut_client(); + let closed_client_topology_id = client.topology_id; + client.client = None; + + let mut entities_to_remove = vec![]; + for (key, value) in entities.iter() { + match value { + // skip clients so that we don't remove the client entity itself from + // the map: we want to preserve it so we can + // access the other data stored on the entity. + Entity::Client(_) => {} + _ => { + if value.client_topology_id().await + == Some(closed_client_topology_id) + { + entities_to_remove.push(key.clone()); + } + } + } + } + for entity_id in entities_to_remove { + entities.remove(&entity_id); + } + + Ok(None) + } + Entity::Cursor(_) => { + let cursor = entities.get_mut(id).unwrap().as_mut_cursor(); + let rx = cursor.make_kill_watcher().await; + *cursor = TestCursor::Closed; + drop(entities); + let _ = rx.await; + Ok(None) + } + _ => panic!( + "Unsupported entity {:?} for close operation; expected Client or Cursor", + target_entity + ), + } } .boxed() } diff --git a/src/test/spec/unified_runner/test_event.rs b/src/test/spec/unified_runner/test_event.rs index 42d0e7b42..349028d57 100644 --- a/src/test/spec/unified_runner/test_event.rs +++ b/src/test/spec/unified_runner/test_event.rs @@ -1,10 +1,10 @@ use crate::{ bson::Document, event::{ - cmap::{ConnectionCheckoutFailedReason, ConnectionClosedReason}, + cmap::{CmapEvent, ConnectionCheckoutFailedReason, ConnectionClosedReason}, command::CommandEvent, }, - test::{CmapEvent, Event}, + test::Event, ServerType, }; use serde::Deserialize; @@ -141,11 +141,11 @@ impl ObserveEvent { (Self::ConnectionClosed, Event::Cmap(CmapEvent::ConnectionClosed(_))) => true, ( Self::ConnectionCheckOutStarted, - Event::Cmap(CmapEvent::ConnectionCheckOutStarted(_)), + Event::Cmap(CmapEvent::ConnectionCheckoutStarted(_)), ) => true, ( Self::ConnectionCheckOutFailed, - Event::Cmap(CmapEvent::ConnectionCheckOutFailed(_)), + Event::Cmap(CmapEvent::ConnectionCheckoutFailed(_)), ) => true, (Self::ConnectionCheckedOut, Event::Cmap(CmapEvent::ConnectionCheckedOut(_))) => true, (Self::ConnectionCheckedIn, Event::Cmap(CmapEvent::ConnectionCheckedIn(_))) => true, diff --git a/src/test/spec/unified_runner/test_file.rs b/src/test/spec/unified_runner/test_file.rs index bb7e0690f..3f108f6a1 100644 --- a/src/test/spec/unified_runner/test_file.rs +++ b/src/test/spec/unified_runner/test_file.rs @@ -613,6 +613,7 @@ where fn log_component_as_tracing_target(component: &String) -> String { match component.as_ref() { "command" => trace::COMMAND_TRACING_EVENT_TARGET.to_string(), + "connection" => trace::CONNECTION_TRACING_EVENT_TARGET.to_string(), _ => panic!("Unknown tracing target: {}", component), } } diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index ae98f5593..19c4a1f32 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -276,8 +276,7 @@ impl TestRunner { .await; for expectation in expected_messages { - let client_topology_id = - self.get_client(&expectation.client).await.topology().id; + let client_topology_id = self.get_client(&expectation.client).await.topology_id; let client_actual_events: Vec<_> = all_tracing_events .iter() diff --git a/src/test/util/event.rs b/src/test/util/event.rs index f54f726b4..440ce7ec9 100644 --- a/src/test/util/event.rs +++ b/src/test/util/event.rs @@ -6,6 +6,7 @@ use std::{ time::Duration, }; +use derive_more::From; use serde::Serialize; use time::OffsetDateTime; use tokio::sync::{broadcast::error::SendError, RwLockReadGuard}; @@ -15,6 +16,7 @@ use crate::{ bson::{doc, to_document, Document}, event::{ cmap::{ + CmapEvent, CmapEventHandler, ConnectionCheckedInEvent, ConnectionCheckedOutEvent, @@ -54,7 +56,6 @@ use crate::{ }; pub(crate) type EventQueue = Arc>>; -pub(crate) type CmapEvent = crate::cmap::test::event::Event; fn add_event_to_queue(event_queue: &EventQueue, event: T) { event_queue @@ -313,7 +314,7 @@ impl CmapEventHandler for EventHandler { } fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) { - let event = CmapEvent::ConnectionCheckOutFailed(event); + let event = CmapEvent::ConnectionCheckoutFailed(event); self.handle(event.clone()); add_event_to_queue(&self.cmap_events, event); } @@ -361,7 +362,7 @@ impl CmapEventHandler for EventHandler { } fn handle_connection_checkout_started_event(&self, event: ConnectionCheckoutStartedEvent) { - let event = CmapEvent::ConnectionCheckOutStarted(event); + let event = CmapEvent::ConnectionCheckoutStarted(event); self.handle(event.clone()); add_event_to_queue(&self.cmap_events, event); } diff --git a/src/test/util/mod.rs b/src/test/util/mod.rs index 803e2682f..8700b412b 100644 --- a/src/test/util/mod.rs +++ b/src/test/util/mod.rs @@ -7,10 +7,11 @@ mod subscriber; mod trace; pub(crate) use self::{ - event::{CmapEvent, Event, EventClient, EventHandler, SdamEvent}, + event::{Event, EventClient, EventHandler, SdamEvent}, failpoint::{FailCommandOptions, FailPoint, FailPointGuard, FailPointMode}, lock::TestLock, matchable::{assert_matches, eq_matches, MatchErrExt, Matchable}, + subscriber::EventSubscriber, }; #[cfg(feature = "tracing-unstable")] diff --git a/src/test/util/subscriber.rs b/src/test/util/subscriber.rs index 6b4f59d22..40efdd629 100644 --- a/src/test/util/subscriber.rs +++ b/src/test/util/subscriber.rs @@ -76,4 +76,18 @@ impl EventSubscriber<'_, H, E> { .ok() .flatten() } + + /// Returns the received events without waiting for any more. + pub(crate) fn all(&mut self, filter: F) -> Vec + where + F: Fn(&E) -> bool, + { + let mut events = Vec::new(); + while let Ok(event) = self.receiver.try_recv() { + if filter(&event) { + events.push(event); + } + } + events + } } diff --git a/src/trace.rs b/src/trace.rs index 6e4e825b8..68d34e472 100644 --- a/src/trace.rs +++ b/src/trace.rs @@ -1,31 +1,53 @@ +use crate::client::options::{ServerAddress, DEFAULT_PORT}; use bson::Bson; pub(crate) mod command; +pub(crate) mod connection; pub(crate) const COMMAND_TRACING_EVENT_TARGET: &str = "mongodb::command"; +pub(crate) const CONNECTION_TRACING_EVENT_TARGET: &str = "mongodb::connection"; trait TracingRepresentation { - fn tracing_representation(self) -> String; + type Representation; + + fn tracing_representation(self) -> Self::Representation; } impl TracingRepresentation for bson::oid::ObjectId { + type Representation = String; + fn tracing_representation(self) -> String { self.to_hex() } } impl TracingRepresentation for bson::Document { + type Representation = String; + fn tracing_representation(self) -> String { Bson::Document(self).into_relaxed_extjson().to_string() } } impl TracingRepresentation for crate::error::Error { + type Representation = String; + fn tracing_representation(self) -> String { self.to_string() } } +impl ServerAddress { + /// Per spec should populate the port field with 27017 if we are defaulting to that. + fn port_tracing_representation(&self) -> Option { + match self { + Self::Tcp { port, .. } => Some(port.unwrap_or(DEFAULT_PORT)), + // TODO: RUST-802 For Unix domain sockets we should return None here, as ports + // are not meaningful for those. + } + } +} + /// We don't currently use all of these levels but they are included for completeness. #[allow(dead_code)] pub(crate) enum TracingOrLogLevel { diff --git a/src/trace/command.rs b/src/trace/command.rs index c3d13fc4b..244e7e172 100644 --- a/src/trace/command.rs +++ b/src/trace/command.rs @@ -37,19 +37,15 @@ impl CommandEventHandler for CommandTracingEventEmitter { tracing::debug!( target: COMMAND_TRACING_EVENT_TARGET, topologyId = self.topology_id.tracing_representation(), - command = - serialize_command_or_reply(event.command, self.max_document_length_bytes).as_str(), - databaseName = event.db.as_str(), - commandName = event.command_name.as_str(), + command = serialize_command_or_reply(event.command, self.max_document_length_bytes), + databaseName = event.db, + commandName = event.command_name, requestId = event.request_id, driverConnectionId = event.connection.id, serverConnectionId = event.connection.server_id, serverHost = event.connection.address.host(), - serverPort = event.connection.address.port(), - serviceId = event - .service_id - .map(|id| id.tracing_representation()) - .as_deref(), + serverPort = event.connection.address.port_tracing_representation(), + serviceId = event.service_id.map(|id| id.tracing_representation()), "Command started" ); } @@ -58,18 +54,14 @@ impl CommandEventHandler for CommandTracingEventEmitter { tracing::debug!( target: COMMAND_TRACING_EVENT_TARGET, topologyId = self.topology_id.tracing_representation(), - reply = - serialize_command_or_reply(event.reply, self.max_document_length_bytes).as_str(), - commandName = event.command_name.as_str(), + reply = serialize_command_or_reply(event.reply, self.max_document_length_bytes), + commandName = event.command_name, requestId = event.request_id, driverConnectionId = event.connection.id, serverConnectionId = event.connection.server_id, serverHost = event.connection.address.host(), - serverPort = event.connection.address.port(), - serviceId = event - .service_id - .map(|id| id.tracing_representation()) - .as_deref(), + serverPort = event.connection.address.port_tracing_representation(), + serviceId = event.service_id.map(|id| id.tracing_representation()), durationMS = event.duration.as_millis(), "Command succeeded" ); @@ -80,16 +72,13 @@ impl CommandEventHandler for CommandTracingEventEmitter { target: COMMAND_TRACING_EVENT_TARGET, topologyId = self.topology_id.tracing_representation(), failure = event.failure.tracing_representation(), - commandName = event.command_name.as_str(), + commandName = event.command_name, requestId = event.request_id, driverConnectionId = event.connection.id, serverConnectionId = event.connection.server_id, serverHost = event.connection.address.host(), - serverPort = event.connection.address.port(), - serviceId = event - .service_id - .map(|id| id.tracing_representation()) - .as_deref(), + serverPort = event.connection.address.port_tracing_representation(), + serviceId = event.service_id.map(|id| id.tracing_representation()), durationMS = event.duration.as_millis(), "Command failed" ); diff --git a/src/trace/connection.rs b/src/trace/connection.rs new file mode 100644 index 000000000..a0b76e2fc --- /dev/null +++ b/src/trace/connection.rs @@ -0,0 +1,190 @@ +use bson::oid::ObjectId; + +use crate::{ + event::cmap::{ + CmapEventHandler, + ConnectionCheckedInEvent, + ConnectionCheckedOutEvent, + ConnectionCheckoutFailedEvent, + ConnectionCheckoutFailedReason, + ConnectionCheckoutStartedEvent, + ConnectionClosedEvent, + ConnectionClosedReason, + ConnectionCreatedEvent, + ConnectionReadyEvent, + PoolClearedEvent, + PoolClosedEvent, + PoolCreatedEvent, + PoolReadyEvent, + }, + trace::{TracingRepresentation, CONNECTION_TRACING_EVENT_TARGET}, +}; + +#[derive(Clone)] +pub(crate) struct ConnectionTracingEventEmitter { + topology_id: ObjectId, +} + +impl ConnectionTracingEventEmitter { + pub(crate) fn new(topology_id: ObjectId) -> ConnectionTracingEventEmitter { + Self { topology_id } + } +} + +impl CmapEventHandler for ConnectionTracingEventEmitter { + fn handle_pool_created_event(&self, event: PoolCreatedEvent) { + let options_ref = event.options.as_ref(); + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + maxIdleTimeMS = options_ref.and_then(|o| o.max_idle_time.map(|m| m.as_millis())), + maxPoolSize = options_ref.and_then(|o| o.max_pool_size), + minPoolSize = options_ref.and_then(|o| o.min_pool_size), + "Connection pool created", + ); + } + + fn handle_pool_ready_event(&self, event: PoolReadyEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + "Connection pool ready", + ); + } + + fn handle_pool_cleared_event(&self, event: PoolClearedEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + serviceId = event.service_id.map(|id| id.tracing_representation()), + "Connection pool cleared", + ); + } + + fn handle_pool_closed_event(&self, event: PoolClosedEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + "Connection pool closed", + ); + } + + fn handle_connection_created_event(&self, event: ConnectionCreatedEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + "Connection created", + ); + } + + fn handle_connection_ready_event(&self, event: ConnectionReadyEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + "Connection ready", + ); + } + + fn handle_connection_closed_event(&self, event: ConnectionClosedEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + reason = event.reason.tracing_representation(), + error = event.error.map(|e| e.tracing_representation()), + "Connection closed", + ); + } + + fn handle_connection_checkout_started_event(&self, event: ConnectionCheckoutStartedEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + "Connection checkout started", + ); + } + + fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + reason = event.reason.tracing_representation(), + error = event.error.map(|e| e.tracing_representation()), + "Connection checkout failed", + ); + } + + fn handle_connection_checked_out_event(&self, event: ConnectionCheckedOutEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + "Connection checked out", + ); + } + + fn handle_connection_checked_in_event(&self, event: ConnectionCheckedInEvent) { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + "Connection checked in", + ); + } +} + +impl TracingRepresentation for ConnectionClosedReason { + type Representation = &'static str; + + fn tracing_representation(self) -> &'static str { + match self { + ConnectionClosedReason::Stale => "Connection became stale because the pool was cleared", + ConnectionClosedReason::Idle => { + "Connection has been available but unused for longer than the configured max idle \ + time" + } + ConnectionClosedReason::Error => "An error occurred while using the connection", + ConnectionClosedReason::Dropped => "Connection was dropped during an operation", + ConnectionClosedReason::PoolClosed => "Connection pool was closed", + } + } +} + +impl TracingRepresentation for ConnectionCheckoutFailedReason { + type Representation = &'static str; + + fn tracing_representation(self) -> &'static str { + match self { + ConnectionCheckoutFailedReason::Timeout => { + "Failed to establish a new connection within connectTimeoutMS" + } + ConnectionCheckoutFailedReason::ConnectionError => { + "An error occurred while trying to establish a new connection" + } + } + } +}