Skip to content

Commit f9baf09

Browse files
committed
Auto merge of #618 - Mark-Simulacrum:opt, r=Mark-Simulacrum
Move record-progress processing to separate thread This is intended to let us prioritize work on other requests over work on record-progress, thereby avoiding some of the timeouts and "database is locked" errors we would otherwise see when the record-progress requests happen to take priority. This separate thread is designed to only run when the server has no requests in-flight (other than a short, bounded, queue of record-progress requests). If that queue fills up, we will tell workers to slow down, causing them to retry requests -- currently at fixed intervals and per worker thread, but a future commit might clean that up a little to have a more intentional delay. In general this should, hopefully, decrease the error rate as particularly human-initiated requests should never have to wait for more than one record-progress event to complete before having largely uncontended access to the database. (Other requests still happen concurrently, but requests are typically very rare in comparison to record-progress which are multiple times a second, effectively constantly processing). Errors like rust-lang/rust#94775 (comment) are the primary motivation here, which I hope this is enough to largely clear up.
2 parents 6386c67 + 229283e commit f9baf09

File tree

7 files changed

+188
-73
lines changed

7 files changed

+188
-73
lines changed

Cargo.lock

+28-46
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ chrono = { version = "0.4", features = ["serde"] }
1919
chrono-humanize = "0.1.1"
2020
crates-index = "0.16.2"
2121
crossbeam-utils = "0.5"
22+
crossbeam-channel = "0.5"
2223
csv = "1.0.2"
2324
dotenv = "0.13"
2425
failure = "0.1.3"

src/agent/api.rs

+2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ impl ResponseExt for ::reqwest::blocking::Response {
3636
match self.status() {
3737
StatusCode::NOT_FOUND => return Err(AgentApiError::InvalidEndpoint.into()),
3838
StatusCode::BAD_GATEWAY
39+
| StatusCode::TOO_MANY_REQUESTS
3940
| StatusCode::SERVICE_UNAVAILABLE
4041
| StatusCode::GATEWAY_TIMEOUT => {
4142
return Err(AgentApiError::ServerUnavailable.into());
@@ -50,6 +51,7 @@ impl ResponseExt for ::reqwest::blocking::Response {
5051
.with_context(|_| format!("failed to parse API response (status code {})", status,))?;
5152
match result {
5253
ApiResponse::Success { result } => Ok(result),
54+
ApiResponse::SlowDown => Err(AgentApiError::ServerUnavailable.into()),
5355
ApiResponse::InternalError { error } => {
5456
Err(AgentApiError::InternalServerError(error).into())
5557
}

src/server/api_types.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub struct AgentConfig {
2020
#[serde(tag = "status", rename_all = "kebab-case")]
2121
pub enum ApiResponse<T> {
2222
Success { result: T },
23+
SlowDown,
2324
InternalError { error: String },
2425
Unauthorized,
2526
NotFound,
@@ -41,8 +42,9 @@ impl ApiResponse<()> {
4142

4243
impl<T> ApiResponse<T> {
4344
fn status_code(&self) -> StatusCode {
44-
match *self {
45+
match self {
4546
ApiResponse::Success { .. } => StatusCode::OK,
47+
ApiResponse::SlowDown => StatusCode::TOO_MANY_REQUESTS,
4648
ApiResponse::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
4749
ApiResponse::Unauthorized => StatusCode::UNAUTHORIZED,
4850
ApiResponse::NotFound => StatusCode::NOT_FOUND,

src/server/metrics.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use crate::server::agents::Agent;
55
use chrono::{DateTime, Utc};
66
use prometheus::proto::{Metric, MetricFamily};
77
use prometheus::{
8-
HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, __register_counter_vec, __register_gauge,
9-
__register_gauge_vec,
8+
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, __register_counter_vec,
9+
__register_gauge, __register_gauge_vec, opts, register_counter, register_int_counter,
1010
};
1111

1212
const JOBS_METRIC: &str = "crater_completed_jobs_total";
@@ -18,6 +18,7 @@ const ENDPOINT_TIME: &str = "crater_endpoint_time_seconds";
1818
#[derive(Clone)]
1919
pub struct Metrics {
2020
crater_completed_jobs_total: IntCounterVec,
21+
pub crater_bounced_record_progress: IntCounter,
2122
crater_agent_failure: IntCounterVec,
2223
crater_work_status: IntGaugeVec,
2324
crater_last_crates_update: IntGauge,
@@ -29,6 +30,10 @@ impl Metrics {
2930
let jobs_opts = prometheus::opts!(JOBS_METRIC, "total completed jobs");
3031
let crater_completed_jobs_total =
3132
prometheus::register_int_counter_vec!(jobs_opts, &["agent", "experiment"])?;
33+
let crater_bounced_record_progress = prometheus::register_int_counter!(
34+
"crater_bounced_record_progress",
35+
"hits with full record progress queue"
36+
)?;
3237
let failure_opts = prometheus::opts!(AGENT_FAILED, "total completed jobs");
3338
let crater_agent_failure =
3439
prometheus::register_int_counter_vec!(failure_opts, &["agent", "experiment"])?;
@@ -47,6 +52,7 @@ impl Metrics {
4752

4853
Ok(Metrics {
4954
crater_completed_jobs_total,
55+
crater_bounced_record_progress,
5056
crater_agent_failure,
5157
crater_work_status,
5258
crater_last_crates_update,

src/server/mod.rs

+16-7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct Data {
4444
pub agents: Agents,
4545
pub db: Database,
4646
pub reports_worker: reports::ReportsWorker,
47+
pub record_progress_worker: routes::agent::RecordProgressThread,
4748
pub acl: ACL,
4849
pub metrics: Metrics,
4950
}
@@ -80,6 +81,10 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> {
8081
info!("initialized metrics...");
8182

8283
let data = Data {
84+
record_progress_worker: routes::agent::RecordProgressThread::new(
85+
db.clone(),
86+
metrics.clone(),
87+
),
8388
config,
8489
tokens,
8590
agents,
@@ -100,7 +105,9 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> {
100105
let data = Arc::new(data);
101106
let github_data = github_data.map(Arc::new);
102107

108+
let record_progress_worker = data.record_progress_worker.clone();
103109
let routes = warp::any()
110+
.and(warp::any().map(move || record_progress_worker.clone().start_request()))
104111
.and(
105112
warp::any()
106113
.and(
@@ -118,13 +125,15 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> {
118125
.or(routes::ui::routes(data))
119126
.unify(),
120127
)
121-
.map(|mut resp: Response<Body>| {
122-
resp.headers_mut().insert(
123-
http::header::SERVER,
124-
HeaderValue::from_static(&SERVER_HEADER),
125-
);
126-
resp
127-
});
128+
.map(
129+
|_guard: routes::agent::RequestGuard, mut resp: Response<Body>| {
130+
resp.headers_mut().insert(
131+
http::header::SERVER,
132+
HeaderValue::from_static(&SERVER_HEADER),
133+
);
134+
resp
135+
},
136+
);
128137

129138
warp::serve(routes).run(bind);
130139

0 commit comments

Comments
 (0)