Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-360 Streaming monitoring protocol #721

Merged

Conversation

patrickfreed
Copy link
Contributor

RUST-360

This PR updates the driver to make use of the streaming monitoring protocol (described here), which allows the driver to accept topology updates that are pushed to it rather than polling the server regularly. This should allow the driver to more quickly recover from failover events.

&self,
conn: &mut Connection,
topology: Option<&Topology>,
handler: &Option<Arc<dyn SdamEventHandler>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The monitors now handle emitting these events, so we don't need to pipe these event handlers down into the handshaker anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this may make things easier for SDAM logging too 😇

acknowledgment_receiver.wait_for_acknowledgment().await;
}
///
/// Since management requests are treated with the highest priority by the pool and will be
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the pool worker task to use a biased select!, which means that it will poll in a certain order. Specifically for this, I updated it to check for management requests (clearing, marking as ready, etc) before checking for check-out requests. This means that we don't actually have to wait for acknowledgment that the pool is ready before returning here, since we know that no check-out requests will be processed before this MarkAsReady is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it turns out, this wasn't completely race-proof, so I reverted back to waiting for acknowledgment.

Ok(hello_reply)
}) {
Ok(hello_reply) => {
emit_event(topology, handler, |handler| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic was moved into the monitor.

src/lib.rs Outdated
@@ -297,7 +297,8 @@
allow(
clippy::unreadable_literal,
clippy::cognitive_complexity,
clippy::float_cmp
clippy::float_cmp,
clippy::match_like_matches_macro
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this macro forces you to write something like

match a {
    SomeEnum::B => false,
    _ => true
}

as

!matches!(a, SomeEnum::B)

which I personally find less readable (super easy to miss that first !). What do you all think?

@@ -18,24 +18,31 @@ impl WorkerHandle {
/// Listener used to determine when all handles have been dropped.
#[derive(Debug)]
pub(crate) struct WorkerHandleListener {
receiver: mpsc::Receiver<()>,
sender: watch::Sender<()>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was rewritten to use a watch channel which is more appropriate for this type and also allowed us to use &self everywhere instead of &mut self. The functionality is otherwise unchanged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is more appropriate for this type

could you explain the rationale for this? I've read the docs for both mpsc and watch and it's not super obvious to me why it makes more sene for the listener to be the "producer" rather than the "consumer" (the other way actually seems more intuitive to me)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, yeah this comment is pretty vague. The main value is to be able to use the "closing" functionality on the sender half without taking a mutable reference. Also, the channel isn't actually used for sending any values, so watch not needing to allocate a buffer or anything under the hood is nice (watch channels are basically glorified RwLock<T>s with nice async semantics).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh that all makes sense. thanks!

fn sync_hosts(&mut self, hosts: HashSet<ServerAddress>) -> bool {
let mut new_description = self.topology_description.clone();
new_description.sync_hosts(&hosts);
self.update_topology(new_description)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything that modifies the topology now goes through the following steps:

  1. clone the existing topology description
  2. modify it in some way
  3. pass it to update_topology

This cuts down on the one-off logic in these various methods and hopefully makes the worker easier to read / reason about.

@@ -94,7 +94,13 @@ impl RunOnRequirement {
}
}
if let Some(ref topologies) = self.topologies {
if !topologies.contains(&client.topology().await) {
let client_topology = client.topology().await;
if !topologies.iter().any(|expected_topology| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the spec, a "sharded" runOnRequirement should match both "sharded" and "sharded-replicaset" topologies. Since we were using direct equality, we may have been skipping some tests accidentally.

@@ -505,9 +505,12 @@ impl EventSubscriber<'_> {
F: FnMut(&Event) -> bool,
{
let mut events = Vec::new();
while let Some(event) = self.wait_for_event(timeout, &mut filter).await {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes how this method works at a fundamental level, but I think in a way that makes much more sense. Before, it would collect events until an event was not received for the given timeout, so if there was a constant stream of events it would go on forever. The new version collects events for the given time and returns what it observed.

@@ -319,7 +228,21 @@ impl Topology {
#[derive(Debug, Clone)]
pub(crate) struct TopologyState {
pub(crate) description: TopologyDescription,
pub(crate) servers: HashMap<ServerAddress, Arc<Server>>,
servers: HashMap<ServerAddress, Weak<Server>>,
Copy link
Contributor Author

@patrickfreed patrickfreed Aug 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing Arc<Server> here actually clones them for every receiver (aka TopologyWatcher), which led to a reference cycle wherein the Servers were never actually getting dropped because the monitors held watchers, but the monitors never exited because the Servers never dropped. Storing Weak pointers here fixes this.

One future area to explore would be to ditch Arc / Weak here altogether and just pass clones of Server around, but I didn't want to go too deep down the refactor rabbit hole.

2.3.0 (which has the Topology refactor) is not affected by this issue, but that's because the monitors exit once they detect the topology is dropped, which then causes the Servers to drop. They have no need for the Server to be alive in order to be dropped. This does highlight a separate issue, which is that monitors on 2.3.0 (and probably earlier) don't exit if a server is removed from the Topology. This is fixed now, however. (Filed RUST-1443 for it)

@@ -1,7 +1,7 @@
use std::{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has gotten kind of long and complicated, so once this PR is merged I think I'll go in and break it down into a few smaller ones. For the purposes of preserving the diff I didn't do that in this PR.

@patrickfreed
Copy link
Contributor Author

I will say that I don't have a great mental model of how all of the components in the driver interact so I think the parts of the code that involve coordination/communication between different components have been difficult for me to review and provide meaningful input on, especially in cases where things have been refactored.

I'm going to try to spend some time reasoning through how it all fits together before doing another round of review, but in the meantime figured I would go ahead and give these comments.

Sounds good. And yeah, the various refactors + cross component nature is part of what made this difficult from the implementation side too. Would be happy to do a walkthrough of some of the changes over zoom if that would be helpful to reviewers. If not, no worries too.

@@ -116,19 +112,7 @@ lazy_static! {
version: None,
},
platform: format!("{} with {}", rustc_version_runtime::version_meta().short_version_string, RUNTIME_NAME),
};

let info = os_info::get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do need to reintroduce it, we could avoid the timeout issue by reading the lazy_static in client initialization.

@patrickfreed patrickfreed requested a review from abr-egn September 7, 2022 20:29
Copy link
Contributor

@abr-egn abr-egn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

/// Create a new `TlsConfig` from the provided options from the user.
/// This operation is expensive, so the resultant `TlsConfig` should be cached.
pub(crate) fn new(options: TlsOptions) -> Result<TlsConfig> {
let verify_hostname = options.allow_invalid_hostnames.unwrap_or(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic is wrong - if a user sets allow_invalid_hostnames to true, shouldn't verify_hostname be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good catch, yeah it is. Updated to use a match to make the logic clearer here. As a side note, I'm a little concerned this didn't trigger any test failures. Filed RUST-1467 for introducing test coverage for this.

) -> Result<Self> {
let inner = AsyncTcpStream::connect(&address).await?;

// If there are TLS options, wrap the inner stream with rustls.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a copy-paste from below, but is this comment still accurate or can this also be openSSL now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

@kmahar kmahar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! great job with this 👏

Copy link
Contributor

@isabelatkinson isabelatkinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like all the questions i had were asked & answered -- lgtm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants