Skip to content

Commit 42fe4cf

Browse files
authored
chore(app/inbound): address hyper deprecations in http/1 tests (#3454)
this is a follow-up commit related to 24dc5d8 (#3445). see <linkerd/linkerd2#8733> for more information on upgrading to hyper 1.0. --- this addresses hyper deprecations in the http/1 tests for the inbound proxy. prior, we made use of `tower::ServiceExt::oneshot`, which consumes a service and drops it after sending a request and polling the response future to completion. <https://docs.rs/tower/0.5.2/src/tower/util/oneshot.rs.html#96-100> tower is not a 1.0 library yet, so `SendRequest` does not provide an implementation of `tower::Service` in hyper's 1.0 interface: - <https://docs.rs/hyper/0.14.31/hyper/client/conn/struct.SendRequest.html#impl-Service%3CRequest%3CB%3E%3E-for-SendRequest%3CB%3E> - <https://docs.rs/hyper/1.5.1/hyper/client/conn/http1/struct.SendRequest.html#trait-implementations> consequentially, we must drop the sender ourselves after receiving a response now. --- this commit *also* addresses hyper deprecations in the http/1 downgrade tests for the inbound proxy. because these tests involve a http/2 client and an http/1 server, we take the choice of inlining the body of `http_util::connect_and_accept()` rather than introducing a new, third `http_util::connect_and_accept_http_downgrade()` function. we will refactor these helper functions in follow-on commits. NB: because `ContextError` is internal to the `linkerd-app-test` crate, we do not wrap the errors. these are allegedly used by the fuzzing tests (_see f.ex #986 and #989_), but for our purposes with respect to the inbound proxy we can elide them rather than making `ctx()` a public method. --- Signed-off-by: katelyn martin <kate@buoyant.io>
1 parent e9e2a31 commit 42fe4cf

File tree

2 files changed

+129
-29
lines changed

2 files changed

+129
-29
lines changed

linkerd/app/inbound/src/http/tests.rs

+84-29
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ where
4848
#[tokio::test(flavor = "current_thread")]
4949
async fn unmeshed_http1_hello_world() {
5050
let server = hyper::server::conn::http1::Builder::new();
51-
#[allow(deprecated)] // linkerd/linkerd2#8733
52-
let mut client = hyper::client::conn::Builder::new();
51+
let mut client = hyper::client::conn::http1::Builder::new();
5352
let _trace = trace_init();
5453

5554
// Build a mock "connector" that returns the upstream "server" IO.
@@ -64,15 +63,15 @@ async fn unmeshed_http1_hello_world() {
6463
let cfg = default_config();
6564
let (rt, _shutdown) = runtime();
6665
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1);
67-
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
66+
let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await;
6867

6968
let req = Request::builder()
7069
.method(http::Method::GET)
7170
.uri("http://foo.svc.cluster.local:5550")
7271
.body(Body::default())
7372
.unwrap();
7473
let rsp = client
75-
.oneshot(req)
74+
.send_request(req)
7675
.await
7776
.expect("HTTP client request failed");
7877
tracing::info!(?rsp);
@@ -81,6 +80,7 @@ async fn unmeshed_http1_hello_world() {
8180
assert_eq!(body, "Hello world!");
8281

8382
// Wait for all of the background tasks to complete, panicking if any returned an error.
83+
drop(client);
8484
bg.join_all()
8585
.await
8686
.into_iter()
@@ -92,9 +92,7 @@ async fn unmeshed_http1_hello_world() {
9292
async fn downgrade_origin_form() {
9393
// Reproduces https://github.com/linkerd/linkerd2/issues/5298
9494
let server = hyper::server::conn::http1::Builder::new();
95-
#[allow(deprecated)] // linkerd/linkerd2#8733
96-
let mut client = hyper::client::conn::Builder::new();
97-
client.http2_only(true);
95+
let client = hyper::client::conn::http2::Builder::new(TracingExecutor);
9896
let _trace = trace_init();
9997

10098
// Build a mock "connector" that returns the upstream "server" IO.
@@ -109,7 +107,35 @@ async fn downgrade_origin_form() {
109107
let cfg = default_config();
110108
let (rt, _shutdown) = runtime();
111109
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2);
112-
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
110+
let (mut client, bg) = {
111+
tracing::info!(settings = ?client, "connecting client with");
112+
let (client_io, server_io) = io::duplex(4096);
113+
114+
let (client, conn) = client
115+
.handshake(client_io)
116+
.await
117+
.expect("Client must connect");
118+
119+
let mut bg = tokio::task::JoinSet::new();
120+
bg.spawn(
121+
async move {
122+
server.oneshot(server_io).await?;
123+
tracing::info!("proxy serve task complete");
124+
Ok(())
125+
}
126+
.instrument(tracing::info_span!("proxy")),
127+
);
128+
bg.spawn(
129+
async move {
130+
conn.await?;
131+
tracing::info!("client background complete");
132+
Ok(())
133+
}
134+
.instrument(tracing::info_span!("client_bg")),
135+
);
136+
137+
(client, bg)
138+
};
113139

114140
let req = Request::builder()
115141
.method(http::Method::GET)
@@ -119,7 +145,7 @@ async fn downgrade_origin_form() {
119145
.body(Body::default())
120146
.unwrap();
121147
let rsp = client
122-
.oneshot(req)
148+
.send_request(req)
123149
.await
124150
.expect("HTTP client request failed");
125151
tracing::info!(?rsp);
@@ -128,6 +154,7 @@ async fn downgrade_origin_form() {
128154
assert_eq!(body, "Hello world!");
129155

130156
// Wait for all of the background tasks to complete, panicking if any returned an error.
157+
drop(client);
131158
bg.join_all()
132159
.await
133160
.into_iter()
@@ -137,10 +164,8 @@ async fn downgrade_origin_form() {
137164

138165
#[tokio::test(flavor = "current_thread")]
139166
async fn downgrade_absolute_form() {
167+
let client = hyper::client::conn::http2::Builder::new(TracingExecutor);
140168
let server = hyper::server::conn::http1::Builder::new();
141-
#[allow(deprecated)] // linkerd/linkerd2#8733
142-
let mut client = hyper::client::conn::Builder::new();
143-
client.http2_only(true);
144169
let _trace = trace_init();
145170

146171
// Build a mock "connector" that returns the upstream "server" IO.
@@ -155,7 +180,36 @@ async fn downgrade_absolute_form() {
155180
let cfg = default_config();
156181
let (rt, _shutdown) = runtime();
157182
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2);
158-
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
183+
184+
let (mut client, bg) = {
185+
tracing::info!(settings = ?client, "connecting client with");
186+
let (client_io, server_io) = io::duplex(4096);
187+
188+
let (client, conn) = client
189+
.handshake(client_io)
190+
.await
191+
.expect("Client must connect");
192+
193+
let mut bg = tokio::task::JoinSet::new();
194+
bg.spawn(
195+
async move {
196+
server.oneshot(server_io).await?;
197+
tracing::info!("proxy serve task complete");
198+
Ok(())
199+
}
200+
.instrument(tracing::info_span!("proxy")),
201+
);
202+
bg.spawn(
203+
async move {
204+
conn.await?;
205+
tracing::info!("client background complete");
206+
Ok(())
207+
}
208+
.instrument(tracing::info_span!("client_bg")),
209+
);
210+
211+
(client, bg)
212+
};
159213

160214
let req = Request::builder()
161215
.method(http::Method::GET)
@@ -165,7 +219,7 @@ async fn downgrade_absolute_form() {
165219
.body(Body::default())
166220
.unwrap();
167221
let rsp = client
168-
.oneshot(req)
222+
.send_request(req)
169223
.await
170224
.expect("HTTP client request failed");
171225
tracing::info!(?rsp);
@@ -174,6 +228,7 @@ async fn downgrade_absolute_form() {
174228
assert_eq!(body, "Hello world!");
175229

176230
// Wait for all of the background tasks to complete, panicking if any returned an error.
231+
drop(client);
177232
bg.join_all()
178233
.await
179234
.into_iter()
@@ -190,16 +245,15 @@ async fn http1_bad_gateway_meshed_response_error_header() {
190245

191246
// Build a client using the connect that always errors so that responses
192247
// are BAD_GATEWAY.
193-
#[allow(deprecated)] // linkerd/linkerd2#8733
194-
let mut client = hyper::client::conn::Builder::new();
248+
let mut client = hyper::client::conn::http1::Builder::new();
195249
let profiles = profile::resolver();
196250
let profile_tx =
197251
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
198252
profile_tx.send(profile::Profile::default()).unwrap();
199253
let cfg = default_config();
200254
let (rt, _shutdown) = runtime();
201255
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1());
202-
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;
256+
let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await;
203257

204258
// Send a request and assert that it is a BAD_GATEWAY with the expected
205259
// header message.
@@ -221,6 +275,7 @@ async fn http1_bad_gateway_meshed_response_error_header() {
221275
check_error_header(rsp.headers(), "server is not listening");
222276

223277
// Wait for all of the background tasks to complete, panicking if any returned an error.
278+
drop(client);
224279
bg.join_all()
225280
.await
226281
.into_iter()
@@ -237,16 +292,15 @@ async fn http1_bad_gateway_unmeshed_response() {
237292

238293
// Build a client using the connect that always errors so that responses
239294
// are BAD_GATEWAY.
240-
#[allow(deprecated)] // linkerd/linkerd2#8733
241-
let mut client = hyper::client::conn::Builder::new();
295+
let mut client = hyper::client::conn::http1::Builder::new();
242296
let profiles = profile::resolver();
243297
let profile_tx =
244298
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
245299
profile_tx.send(profile::Profile::default()).unwrap();
246300
let cfg = default_config();
247301
let (rt, _shutdown) = runtime();
248302
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1);
249-
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
303+
let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await;
250304

251305
// Send a request and assert that it is a BAD_GATEWAY with the expected
252306
// header message.
@@ -256,7 +310,7 @@ async fn http1_bad_gateway_unmeshed_response() {
256310
.body(Body::default())
257311
.unwrap();
258312
let rsp = client
259-
.oneshot(req)
313+
.send_request(req)
260314
.await
261315
.expect("HTTP client request failed");
262316
tracing::info!(?rsp);
@@ -267,6 +321,7 @@ async fn http1_bad_gateway_unmeshed_response() {
267321
);
268322

269323
// Wait for all of the background tasks to complete, panicking if any returned an error.
324+
drop(client);
270325
bg.join_all()
271326
.await
272327
.into_iter()
@@ -285,16 +340,15 @@ async fn http1_connect_timeout_meshed_response_error_header() {
285340

286341
// Build a client using the connect that always sleeps so that responses
287342
// are GATEWAY_TIMEOUT.
288-
#[allow(deprecated)] // linkerd/linkerd2#8733
289-
let mut client = hyper::client::conn::Builder::new();
343+
let mut client = hyper::client::conn::http1::Builder::new();
290344
let profiles = profile::resolver();
291345
let profile_tx =
292346
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
293347
profile_tx.send(profile::Profile::default()).unwrap();
294348
let cfg = default_config();
295349
let (rt, _shutdown) = runtime();
296350
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1());
297-
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
351+
let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await;
298352

299353
// Send a request and assert that it is a GATEWAY_TIMEOUT with the
300354
// expected header message.
@@ -304,7 +358,7 @@ async fn http1_connect_timeout_meshed_response_error_header() {
304358
.body(Body::default())
305359
.unwrap();
306360
let rsp = client
307-
.oneshot(req)
361+
.send_request(req)
308362
.await
309363
.expect("HTTP client request failed");
310364
tracing::info!(?rsp);
@@ -317,6 +371,7 @@ async fn http1_connect_timeout_meshed_response_error_header() {
317371
check_error_header(rsp.headers(), "connect timed out after 1s");
318372

319373
// Wait for all of the background tasks to complete, panicking if any returned an error.
374+
drop(client);
320375
bg.join_all()
321376
.await
322377
.into_iter()
@@ -335,16 +390,15 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {
335390

336391
// Build a client using the connect that always sleeps so that responses
337392
// are GATEWAY_TIMEOUT.
338-
#[allow(deprecated)] // linkerd/linkerd2#8733
339-
let mut client = hyper::client::conn::Builder::new();
393+
let mut client = hyper::client::conn::http1::Builder::new();
340394
let profiles = profile::resolver();
341395
let profile_tx =
342396
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
343397
profile_tx.send(profile::Profile::default()).unwrap();
344398
let cfg = default_config();
345399
let (rt, _shutdown) = runtime();
346400
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1);
347-
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
401+
let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await;
348402

349403
// Send a request and assert that it is a GATEWAY_TIMEOUT with the
350404
// expected header message.
@@ -354,7 +408,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {
354408
.body(Body::default())
355409
.unwrap();
356410
let rsp = client
357-
.oneshot(req)
411+
.send_request(req)
358412
.await
359413
.expect("HTTP client request failed");
360414
tracing::info!(?rsp);
@@ -365,6 +419,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {
365419
);
366420

367421
// Wait for all of the background tasks to complete, panicking if any returned an error.
422+
drop(client);
368423
bg.join_all()
369424
.await
370425
.into_iter()

linkerd/app/test/src/http_util.rs

+45
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,51 @@ pub async fn connect_and_accept(
5555
(client, bg)
5656
}
5757

58+
/// Connects a client and server, running a proxy between them.
59+
///
60+
/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and
61+
/// await a response, and (2) a [`JoinSet<T>`] running background tasks.
62+
pub async fn connect_and_accept_http1(
63+
client_settings: &mut hyper::client::conn::http1::Builder,
64+
server: BoxServer,
65+
) -> (
66+
hyper::client::conn::http1::SendRequest<hyper::Body>,
67+
JoinSet<Result<(), Error>>,
68+
) {
69+
tracing::info!(settings = ?client_settings, "connecting client with");
70+
let (client_io, server_io) = io::duplex(4096);
71+
72+
let (client, conn) = client_settings
73+
.handshake(client_io)
74+
.await
75+
.expect("Client must connect");
76+
77+
let mut bg = tokio::task::JoinSet::new();
78+
bg.spawn(
79+
async move {
80+
server
81+
.oneshot(server_io)
82+
.await
83+
.map_err(ContextError::ctx("proxy background task failed"))?;
84+
tracing::info!("proxy serve task complete");
85+
Ok(())
86+
}
87+
.instrument(tracing::info_span!("proxy")),
88+
);
89+
bg.spawn(
90+
async move {
91+
conn.await
92+
.map_err(ContextError::ctx("client background task failed"))
93+
.map_err(Error::from)?;
94+
tracing::info!("client background complete");
95+
Ok(())
96+
}
97+
.instrument(tracing::info_span!("client_bg")),
98+
);
99+
100+
(client, bg)
101+
}
102+
58103
/// Connects a client and server, running a proxy between them.
59104
///
60105
/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and

0 commit comments

Comments
 (0)