Skip to content

Commit 357c4c5

Browse files
committed
Merge branch 'master' into update-rust-dependencies-scripting-2024-08-01-hyper
Merge the #237 PR into the Scripting branch.
2 parents f6edae8 + 376feed commit 357c4c5

12 files changed

+314
-159
lines changed

Cargo.lock

+189-90
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+10-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ path = "src/bin/test_server.rs"
2020
[dependencies]
2121
base64.workspace = true
2222
body_reader = { path = "./lib/body_reader" }
23-
bytes = "1"
23+
bytes.workspace = true
2424
channel = { path = "./lib/channel" }
2525
clap = { version = "4", features = ["derive", "cargo", "std", "help", "usage", "error-context", "wrap_help"], default-features = false }
2626
config.workspace = true
@@ -33,8 +33,10 @@ futures.workspace = true
3333
futures-timer.workspace = true
3434
hdrhistogram = "7"
3535
http.workspace = true
36-
hyper = { workspace = true, features = ["client", "http1", "http2", "stream"] }
37-
hyper-tls = "0.5"
36+
hyper = { workspace = true, features = ["client", "http1", "http2"] }
37+
hyper-tls = "0.6"
38+
hyper-util = { workspace = true, features = ["tokio", "client", "http1", "http2"] }
39+
http-body-util.workspace = true
3840
itertools.workspace = true
3941
mod_interval = { path = "./lib/mod_interval" }
4042
native-tls = "0.2"
@@ -101,12 +103,15 @@ license = "Apache 2.0"
101103

102104
[workspace.dependencies]
103105
base64 = "0.22"
106+
bytes = "1"
104107
config = { path = "./lib/config" }
105108
ether = { path = "./lib/either" }
106109
futures = "0.3"
107110
futures-timer = "3"
108-
http = "0.2"
109-
hyper = "0.14"
111+
http = "1"
112+
http-body-util = "0.1"
113+
hyper = "1"
114+
hyper-util = "0.1"
110115
itertools = "0.13"
111116
js-sys = "0.3.64"
112117
log = "0.4"

deny.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ private = { ignore = true }
2323
version = 2
2424
allow = [
2525
"Apache-2.0",
26+
"BSD-2-Clause",
2627
"BSD-3-Clause",
2728
"MIT",
2829
"MPL-2.0", # MPL is only copyleft in regards to modifications to the original, adding as a dependency is fine

lib/test_common/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ doctest = false
1010
path = "test_common.rs"
1111

1212
[dependencies]
13+
bytes.workspace = true
1314
futures.workspace = true
1415
futures-timer.workspace = true
15-
hyper = { workspace = true, features = ["server"] }
16+
hyper = { workspace = true, features = ["http1", "http2"] }
17+
hyper-util = { workspace = true, features = ["tokio", "server", "http1", "http2"] }
1618
http.workspace = true
19+
http-body-util.workspace = true
1720
parking_lot = "0.12"
1821
tokio = { workspace = true, features = ["full"] }
1922
url.workspace = true

lib/test_common/test_common.rs

+39-19
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
use log::{debug, info};
2-
use std::{future::Future, io, str::FromStr, sync::Arc, time::Duration};
2+
use std::{future::Future, io, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
3+
use tokio::net::TcpListener;
34

5+
use bytes::Bytes;
46
use futures::{channel::oneshot, future::select, FutureExt};
57
use futures_timer::Delay;
68
use http::{header, StatusCode};
7-
use hyper::{
8-
server::conn::AddrStream,
9-
service::{make_service_fn, service_fn},
10-
Body, Error, Request, Response, Server,
9+
use http_body_util::{BodyExt, Empty};
10+
use hyper::{body::Incoming as Body, service::service_fn, Error, Request, Response};
11+
use hyper_util::{
12+
rt::{TokioExecutor, TokioIo},
13+
server::conn::auto::Builder as HyperBuilder,
1114
};
1215
use parking_lot::Mutex;
1316
use url::Url;
1417

15-
async fn echo_route(req: Request<Body>) -> Response<Body> {
18+
type HyperBody = http_body_util::combinators::BoxBody<Bytes, Error>;
19+
20+
async fn echo_route(req: Request<Body>) -> Response<HyperBody> {
1621
let headers = req.headers();
1722
let content_type = headers
1823
.get(header::CONTENT_TYPE)
@@ -36,24 +41,24 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
3641
if echo.is_some() {
3742
debug!("Echo Body = {}", echo.clone().unwrap_or_default());
3843
}
39-
let mut response = match (req.method(), echo) {
44+
let mut response: Response<HyperBody> = match (req.method(), echo) {
4045
(&http::Method::GET, Some(b)) => Response::builder()
4146
.status(StatusCode::OK)
4247
.header(header::CONTENT_TYPE, content_type)
43-
.body(b.into())
48+
.body(b.map_err(|never| match never {}).boxed())
4449
.unwrap(),
4550
(&http::Method::POST, _) | (&http::Method::PUT, _) => Response::builder()
4651
.status(StatusCode::OK)
4752
.header(header::CONTENT_TYPE, content_type)
48-
.body(req.into_body())
53+
.body(req.into_body().boxed())
4954
.unwrap(),
5055
_ => Response::builder()
5156
.status(StatusCode::NO_CONTENT)
52-
.body(Body::empty())
57+
.body(empty())
5358
.unwrap(),
5459
};
5560
let ms = wait.and_then(|c| FromStr::from_str(&c).ok()).unwrap_or(0);
56-
let old_body = std::mem::replace(response.body_mut(), Body::empty());
61+
let old_body = std::mem::replace(response.body_mut(), empty());
5762
if ms > 0 {
5863
debug!("waiting {} ms", ms);
5964
}
@@ -62,13 +67,22 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
6267
response
6368
}
6469

65-
pub fn start_test_server(
70+
fn empty() -> HyperBody {
71+
Empty::<Bytes>::new()
72+
.map_err(|never| match never {})
73+
.boxed()
74+
}
75+
76+
pub async fn start_test_server(
6677
port: Option<u16>,
6778
) -> (u16, oneshot::Sender<()>, impl Future<Output = ()>) {
6879
let port = port.unwrap_or(0);
69-
let address = ([127, 0, 0, 1], port).into();
80+
let address: SocketAddr = ([127, 0, 0, 1], port).into();
81+
82+
let listener = TcpListener::bind(address).await.unwrap();
83+
let local_addr = listener.local_addr().unwrap();
7084

71-
let make_svc = make_service_fn(|_: &AddrStream| async {
85+
let server = tokio::spawn(async move {
7286
let service = service_fn(|req: Request<Body>| async {
7387
debug!("{:?}", req);
7488
let method = req.method().to_string();
@@ -78,7 +92,7 @@ pub fn start_test_server(
7892
"/" => echo_route(req).await,
7993
_ => Response::builder()
8094
.status(StatusCode::NOT_FOUND)
81-
.body(Body::empty())
95+
.body(empty())
8296
.unwrap(),
8397
};
8498
debug!("{:?}", response);
@@ -92,14 +106,20 @@ pub fn start_test_server(
92106
);
93107
Ok::<_, Error>(response)
94108
});
95-
Ok::<_, Error>(service)
109+
110+
loop {
111+
let (stream, _) = listener.accept().await.unwrap();
112+
let stream = TokioIo::new(stream);
113+
tokio::task::spawn(async move {
114+
let builder = HyperBuilder::new(TokioExecutor::new());
115+
builder.serve_connection(stream, service).await.unwrap();
116+
});
117+
}
96118
});
97119

98120
let (tx, rx) = oneshot::channel();
99121

100-
let server = Server::bind(&address).serve(make_svc);
101-
102-
let port = server.local_addr().port();
122+
let port = local_addr.port();
103123

104124
let future = select(server, rx);
105125

src/bin/test_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ fn main() {
88
rt.block_on(async {
99
let port = std::env::var("PORT").ok().and_then(|s| s.parse().ok());
1010
debug!("port = {}", port.unwrap_or_default());
11-
let (port, rx, handle) = start_test_server(port);
11+
let (port, rx, handle) = start_test_server(port).await;
1212

1313
println!("Listening on port {port}");
1414

src/lib.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,15 @@ use futures::{
3030
stream, FutureExt, Stream, StreamExt,
3131
};
3232
use futures_timer::Delay;
33-
use hyper::{client::HttpConnector, Body, Client};
33+
use http_body_util::combinators::BoxBody;
3434
use hyper_tls::HttpsConnector;
35+
use hyper_util::{
36+
client::legacy::{
37+
connect::{dns::GaiResolver, HttpConnector},
38+
Client,
39+
},
40+
rt::TokioExecutor,
41+
};
3542
use itertools::Itertools;
3643
use line_writer::{blocking_writer, MsgType};
3744
use log::{debug, error, info, warn};
@@ -60,6 +67,8 @@ use std::{
6067
time::{Duration, Instant},
6168
};
6269

70+
type Body = BoxBody<bytes::Bytes, std::io::Error>;
71+
6372
struct Endpoints {
6473
inner: Vec<(
6574
BTreeMap<Arc<str>, String>,
@@ -1175,16 +1184,15 @@ fn create_load_test_future(
11751184

11761185
pub(crate) fn create_http_client(
11771186
keepalive: Duration,
1178-
) -> Result<
1179-
Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>,
1180-
TestError,
1181-
> {
1187+
) -> Result<Client<HttpsConnector<HttpConnector<GaiResolver>>, Body>, TestError> {
11821188
let mut http = HttpConnector::new();
11831189
http.set_keepalive(Some(keepalive));
11841190
http.set_reuse_address(true);
11851191
http.enforce_http(false);
11861192
let https = HttpsConnector::from((http, TlsConnector::new()?.into()));
1187-
Ok(Client::builder().set_host(false).build::<_, Body>(https))
1193+
Ok(Client::builder(TokioExecutor::new())
1194+
.set_host(false)
1195+
.build::<_, Body>(https))
11881196
}
11891197

11901198
type ProvidersResult =

src/request.rs

+25-11
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ use futures::{
1818
sink::SinkExt,
1919
stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
2020
};
21+
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
2122
use hyper::{
22-
client::HttpConnector,
2323
header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION},
24-
Body as HyperBody, Client, Method, Response,
24+
Method, Response,
2525
};
2626
use hyper_tls::HttpsConnector;
27+
use hyper_util::client::legacy::{
28+
connect::{dns::GaiResolver, HttpConnector},
29+
Client,
30+
};
2731
use rand::distributions::{Alphanumeric, Distribution};
2832
use select_any::select_any;
2933
use serde_json as json;
@@ -62,6 +66,8 @@ use std::{
6266
time::{Duration, Instant},
6367
};
6468

69+
type HyperBody = BoxBody<Bytes, std::io::Error>;
70+
6571
#[derive(Clone)]
6672
pub struct AutoReturn {
6773
send_option: ProviderSend,
@@ -206,8 +212,7 @@ pub struct BuilderContext {
206212
#[allow(dead_code)]
207213
pub config_path: PathBuf,
208214
// the http client
209-
pub client:
210-
Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
215+
pub client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
211216
// a mapping of names to their prospective providers
212217
pub providers: Arc<BTreeMap<Arc<str>, providers::Provider>>,
213218
// a mapping of names to their prospective loggers
@@ -504,7 +509,7 @@ fn multipart_body_as_hyper_body(
504509
let piece_stream = future::ok(Bytes::from(piece_data)).into_stream();
505510
tweak_path(&mut body, path);
506511
let a = create_file_hyper_body(body).map_ok(move |(bytes, body)| {
507-
let stream = piece_stream.chain(body).a();
512+
let stream = piece_stream.chain(body.into_data_stream()).a();
508513
(bytes + piece_data_bytes, stream)
509514
});
510515
Either::A(a)
@@ -542,7 +547,9 @@ fn multipart_body_as_hyper_body(
542547
.flatten()
543548
.chain(stream::once(future::ok(closing_boundary)));
544549

545-
(bytes, HyperBody::wrap_stream(stream))
550+
let body: HyperBody =
551+
BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data)));
552+
(bytes, body)
546553
});
547554
Ok(ret)
548555
}
@@ -572,7 +579,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te
572579
}
573580
});
574581

575-
let body = HyperBody::wrap_stream(stream);
582+
let body: HyperBody = BodyExt::boxed(StreamBody::new(
583+
stream.map_ok(|x| hyper::body::Frame::data(x.into())),
584+
));
576585
Ok((bytes, body))
577586
}
578587

@@ -590,7 +599,7 @@ fn body_template_as_hyper_body(
590599
return Either3::A(future::ready(r).and_then(|x| x));
591600
}
592601
Some(EndPointBody::String(t)) => t,
593-
None => return Either3::B(future::ok((0, HyperBody::empty()))),
602+
None => return Either3::B(future::ok((0, BoxBody::default()))),
594603
};
595604
let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()) /*, None*/) {
596605
Ok(b) => b,
@@ -605,7 +614,10 @@ fn body_template_as_hyper_body(
605614
Either3::C(create_file_hyper_body(body))
606615
} else {
607616
*body_value = Some(body.clone());
608-
Either3::B(future::ok((body.as_bytes().len() as u64, body.into())))
617+
Either3::B(future::ok((
618+
body.as_bytes().len() as u64,
619+
body.map_err(|never| match never {}).boxed(),
620+
)))
609621
}
610622
}
611623

@@ -618,7 +630,8 @@ pub type StatsTx = futures_channel::UnboundedSender<stats::StatsMessage>;
618630

619631
pub struct Endpoint {
620632
body: Option<EndPointBody>,
621-
client: Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
633+
// client: Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
634+
client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
622635
headers: config::Headers<True>,
623636
max_parallel_requests: Option<NonZeroUsize>,
624637
method: Method,
@@ -909,7 +922,8 @@ mod tests {
909922
let (_, body) = create_file_hyper_body("tests/test.jpg".to_string())
910923
.await
911924
.unwrap();
912-
body.map(|b| stream::iter(b.unwrap()))
925+
body.into_data_stream()
926+
.map(|b| stream::iter(b.unwrap()))
913927
.flatten()
914928
.collect::<Vec<_>>()
915929
.await

0 commit comments

Comments
 (0)