Skip to content

Commit 5246ac6

Browse files
authored
Updated hyper and http (#237)
* Updated cargo lock file to latest versions * Fixed clippy warnings * Updated dashmap to new release * Updated hyper and http in test_common - Major changes. Many of the utility functions have been moved to the new hyper-util - See https://hyper.rs/guides/1/upgrading/ * Updated root Cargo.toml to latest hyper and http - Broken build. Still more to go. * Updated root Cargo.toml to latest hyper and http - Broken build. Still more to go. * Updated code to handle hyper v1 and http v1 * Ignore clippy warning that breaks code if fixed as suggested * Updated hyper, http, and lock file to latest - Removed minor version constraint on hyper and http - Added approved license BSD-2 Clause to the deny.toml * Cleaned up code - Removed unneeded dependencies - Removed commented out code - Cleaned up BoxBody type in test_common
1 parent 03eeff5 commit 5246ac6

13 files changed

+286
-146
lines changed

Cargo.lock

+176-77
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ for_each_parallel = { path = "./lib/for_each_parallel" }
3232
futures = "0.3"
3333
futures-timer = "3"
3434
hdrhistogram = "7"
35-
http = "0.2"
36-
hyper = { version = "0.14", features = ["client", "http1", "http2", "stream"] }
37-
hyper-tls = "0.5"
35+
http = "1"
36+
hyper = { version = "1", features = ["client", "http1", "http2"] }
37+
hyper-tls = "0.6"
38+
hyper-util = { version = "0.1", features = ["tokio", "client", "http1", "http2"] }
39+
http-body-util = "0.1"
3840
itertools = "0.13"
3941
mod_interval = { path = "./lib/mod_interval" }
4042
native-tls = "0.2"

deny.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ private = { ignore = true }
2323
version = 2
2424
allow = [
2525
"Apache-2.0",
26+
"BSD-2-Clause",
2627
"BSD-3-Clause",
2728
"MIT",
2829
"Unicode-DFS-2016",

lib/config/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ doctest = false
1212
base64 = "0.22"
1313
ether = { path = "../either" }
1414
futures = "0.3"
15-
http = "0.2"
15+
http = "1"
1616
itertools = "0.13"
1717
jsonpath_lib = "0.3.0"
1818
percent-encoding = "2"

lib/test_common/Cargo.toml

+5-2
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ doctest = false
1010
path = "test_common.rs"
1111

1212
[dependencies]
13+
bytes = "1"
1314
futures = "0.3"
1415
futures-timer = "3"
15-
hyper = { version = "0.14", features = ["server"] }
16-
http = "0.2"
16+
hyper = { version = "1", features = ["http1", "http2"] }
17+
hyper-util = { version = "0.1", features = ["tokio", "server", "http1", "http2"] }
18+
http = "1"
19+
http-body-util = "0.1"
1720
parking_lot = "0.12"
1821
tokio = { version = "1", features = ["full"] }
1922
url = "2"

lib/test_common/test_common.rs

+39-19
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
use log::{debug, info};
2-
use std::{future::Future, io, str::FromStr, sync::Arc, time::Duration};
2+
use std::{future::Future, io, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
3+
use tokio::net::TcpListener;
34

5+
use bytes::Bytes;
46
use futures::{channel::oneshot, future::select, FutureExt};
57
use futures_timer::Delay;
68
use http::{header, StatusCode};
7-
use hyper::{
8-
server::conn::AddrStream,
9-
service::{make_service_fn, service_fn},
10-
Body, Error, Request, Response, Server,
9+
use http_body_util::{BodyExt, Empty};
10+
use hyper::{body::Incoming as Body, service::service_fn, Error, Request, Response};
11+
use hyper_util::{
12+
rt::{TokioExecutor, TokioIo},
13+
server::conn::auto::Builder as HyperBuilder,
1114
};
1215
use parking_lot::Mutex;
1316
use url::Url;
1417

15-
async fn echo_route(req: Request<Body>) -> Response<Body> {
18+
type HyperBody = http_body_util::combinators::BoxBody<Bytes, Error>;
19+
20+
async fn echo_route(req: Request<Body>) -> Response<HyperBody> {
1621
let headers = req.headers();
1722
let content_type = headers
1823
.get(header::CONTENT_TYPE)
@@ -36,24 +41,24 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
3641
if echo.is_some() {
3742
debug!("Echo Body = {}", echo.clone().unwrap_or_default());
3843
}
39-
let mut response = match (req.method(), echo) {
44+
let mut response: Response<HyperBody> = match (req.method(), echo) {
4045
(&http::Method::GET, Some(b)) => Response::builder()
4146
.status(StatusCode::OK)
4247
.header(header::CONTENT_TYPE, content_type)
43-
.body(b.into())
48+
.body(b.map_err(|never| match never {}).boxed())
4449
.unwrap(),
4550
(&http::Method::POST, _) | (&http::Method::PUT, _) => Response::builder()
4651
.status(StatusCode::OK)
4752
.header(header::CONTENT_TYPE, content_type)
48-
.body(req.into_body())
53+
.body(req.into_body().boxed())
4954
.unwrap(),
5055
_ => Response::builder()
5156
.status(StatusCode::NO_CONTENT)
52-
.body(Body::empty())
57+
.body(empty())
5358
.unwrap(),
5459
};
5560
let ms = wait.and_then(|c| FromStr::from_str(&c).ok()).unwrap_or(0);
56-
let old_body = std::mem::replace(response.body_mut(), Body::empty());
61+
let old_body = std::mem::replace(response.body_mut(), empty());
5762
if ms > 0 {
5863
debug!("waiting {} ms", ms);
5964
}
@@ -62,13 +67,22 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
6267
response
6368
}
6469

65-
pub fn start_test_server(
70+
fn empty() -> HyperBody {
71+
Empty::<Bytes>::new()
72+
.map_err(|never| match never {})
73+
.boxed()
74+
}
75+
76+
pub async fn start_test_server(
6677
port: Option<u16>,
6778
) -> (u16, oneshot::Sender<()>, impl Future<Output = ()>) {
6879
let port = port.unwrap_or(0);
69-
let address = ([127, 0, 0, 1], port).into();
80+
let address: SocketAddr = ([127, 0, 0, 1], port).into();
81+
82+
let listener = TcpListener::bind(address).await.unwrap();
83+
let local_addr = listener.local_addr().unwrap();
7084

71-
let make_svc = make_service_fn(|_: &AddrStream| async {
85+
let server = tokio::spawn(async move {
7286
let service = service_fn(|req: Request<Body>| async {
7387
debug!("{:?}", req);
7488
let method = req.method().to_string();
@@ -78,7 +92,7 @@ pub fn start_test_server(
7892
"/" => echo_route(req).await,
7993
_ => Response::builder()
8094
.status(StatusCode::NOT_FOUND)
81-
.body(Body::empty())
95+
.body(empty())
8296
.unwrap(),
8397
};
8498
debug!("{:?}", response);
@@ -92,14 +106,20 @@ pub fn start_test_server(
92106
);
93107
Ok::<_, Error>(response)
94108
});
95-
Ok::<_, Error>(service)
109+
110+
loop {
111+
let (stream, _) = listener.accept().await.unwrap();
112+
let stream = TokioIo::new(stream);
113+
tokio::task::spawn(async move {
114+
let builder = HyperBuilder::new(TokioExecutor::new());
115+
builder.serve_connection(stream, service).await.unwrap();
116+
});
117+
}
96118
});
97119

98120
let (tx, rx) = oneshot::channel();
99121

100-
let server = Server::bind(&address).serve(make_svc);
101-
102-
let port = server.local_addr().port();
122+
let port = local_addr.port();
103123

104124
let future = select(server, rx);
105125

src/bin/test_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ fn main() {
88
rt.block_on(async {
99
let port = std::env::var("PORT").ok().and_then(|s| s.parse().ok());
1010
debug!("port = {}", port.unwrap_or_default());
11-
let (port, rx, handle) = start_test_server(port);
11+
let (port, rx, handle) = start_test_server(port).await;
1212

1313
println!("Listening on port {port}");
1414

src/lib.rs

+14-6
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,15 @@ use futures::{
2626
stream, FutureExt, Stream, StreamExt,
2727
};
2828
use futures_timer::Delay;
29-
use hyper::{client::HttpConnector, Body, Client};
29+
use http_body_util::combinators::BoxBody;
3030
use hyper_tls::HttpsConnector;
31+
use hyper_util::{
32+
client::legacy::{
33+
connect::{dns::GaiResolver, HttpConnector},
34+
Client,
35+
},
36+
rt::TokioExecutor,
37+
};
3138
use itertools::Itertools;
3239
use line_writer::{blocking_writer, MsgType};
3340
use log::{debug, error, info, warn};
@@ -57,6 +64,8 @@ use std::{
5764
time::{Duration, Instant},
5865
};
5966

67+
type Body = BoxBody<bytes::Bytes, std::io::Error>;
68+
6069
struct Endpoints {
6170
// yaml index of the endpoint, (endpoint tags, builder)
6271
inner: Vec<(
@@ -1139,16 +1148,15 @@ fn create_load_test_future(
11391148

11401149
pub(crate) fn create_http_client(
11411150
keepalive: Duration,
1142-
) -> Result<
1143-
Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>,
1144-
TestError,
1145-
> {
1151+
) -> Result<Client<HttpsConnector<HttpConnector<GaiResolver>>, Body>, TestError> {
11461152
let mut http = HttpConnector::new();
11471153
http.set_keepalive(Some(keepalive));
11481154
http.set_reuse_address(true);
11491155
http.enforce_http(false);
11501156
let https = HttpsConnector::from((http, TlsConnector::new()?.into()));
1151-
Ok(Client::builder().set_host(false).build::<_, Body>(https))
1157+
Ok(Client::builder(TokioExecutor::new())
1158+
.set_host(false)
1159+
.build::<_, Body>(https))
11521160
}
11531161

11541162
type ProvidersResult = Result<(BTreeMap<String, providers::Provider>, BTreeSet<String>), TestError>;

src/request.rs

+24-11
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ use futures::{
1818
sink::SinkExt,
1919
stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
2020
};
21+
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
2122
use hyper::{
22-
client::HttpConnector,
2323
header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION},
24-
Body as HyperBody, Client, Method, Response,
24+
Method, Response,
2525
};
2626
use hyper_tls::HttpsConnector;
27+
use hyper_util::client::legacy::{
28+
connect::{dns::GaiResolver, HttpConnector},
29+
Client,
30+
};
2731
use rand::distributions::{Alphanumeric, Distribution};
2832
use select_any::select_any;
2933
use serde_json as json;
@@ -55,6 +59,8 @@ use std::{
5559
time::{Duration, Instant},
5660
};
5761

62+
type HyperBody = BoxBody<Bytes, std::io::Error>;
63+
5864
#[derive(Clone)]
5965
pub struct AutoReturn {
6066
send_option: EndpointProvidesSendOptions,
@@ -203,8 +209,7 @@ pub struct BuilderContext {
203209
#[allow(dead_code)]
204210
pub config_path: PathBuf,
205211
// the http client
206-
pub client:
207-
Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
212+
pub client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
208213
// a mapping of names to their prospective providers
209214
pub providers: Arc<BTreeMap<String, providers::Provider>>,
210215
// a mapping of names to their prospective loggers
@@ -498,7 +503,7 @@ fn multipart_body_as_hyper_body(
498503
let piece_stream = future::ok(Bytes::from(piece_data)).into_stream();
499504
tweak_path(&mut body, &multipart_body.path);
500505
let a = create_file_hyper_body(body).map_ok(move |(bytes, body)| {
501-
let stream = piece_stream.chain(body).a();
506+
let stream = piece_stream.chain(body.into_data_stream()).a();
502507
(bytes + piece_data_bytes, stream)
503508
});
504509
Either::A(a)
@@ -539,7 +544,9 @@ fn multipart_body_as_hyper_body(
539544
.flatten()
540545
.chain(stream::once(future::ok(closing_boundary)));
541546

542-
(bytes, HyperBody::wrap_stream(stream))
547+
let body: HyperBody =
548+
BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data)));
549+
(bytes, body)
543550
});
544551
Ok(ret)
545552
}
@@ -569,7 +576,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te
569576
}
570577
});
571578

572-
let body = HyperBody::wrap_stream(stream);
579+
let body: HyperBody = BodyExt::boxed(StreamBody::new(
580+
stream.map_ok(|x| hyper::body::Frame::data(x.into())),
581+
));
573582
Ok((bytes, body))
574583
}
575584

@@ -592,7 +601,7 @@ fn body_template_as_hyper_body(
592601
);
593602
return Either3::A(future::ready(r).and_then(|x| x));
594603
}
595-
BodyTemplate::None => return Either3::B(future::ok((0, HyperBody::empty()))),
604+
BodyTemplate::None => return Either3::B(future::ok((0, BoxBody::default()))),
596605
BodyTemplate::String(t) => t,
597606
};
598607
let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()), None) {
@@ -609,7 +618,10 @@ fn body_template_as_hyper_body(
609618
if copy_body_value {
610619
*body_value = Some(body.clone());
611620
}
612-
Either3::B(future::ok((body.as_bytes().len() as u64, body.into())))
621+
Either3::B(future::ok((
622+
body.as_bytes().len() as u64,
623+
body.map_err(|never| match never {}).boxed(),
624+
)))
613625
}
614626
}
615627

@@ -622,7 +634,7 @@ pub type StatsTx = futures_channel::UnboundedSender<stats::StatsMessage>;
622634

623635
pub struct Endpoint {
624636
body: BodyTemplate,
625-
client: Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
637+
client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
626638
headers: Vec<(String, Template)>,
627639
max_parallel_requests: Option<NonZeroUsize>,
628640
method: Method,
@@ -919,7 +931,8 @@ mod tests {
919931
let (_, body) = create_file_hyper_body("tests/test.jpg".to_string())
920932
.await
921933
.unwrap();
922-
body.map(|b| stream::iter(b.unwrap()))
934+
body.into_data_stream()
935+
.map(|b| stream::iter(b.unwrap()))
923936
.flatten()
924937
.collect::<Vec<_>>()
925938
.await

0 commit comments

Comments
 (0)