1
1
use log:: { debug, info} ;
2
- use std:: { future:: Future , io, str:: FromStr , sync:: Arc , time:: Duration } ;
2
+ use std:: { future:: Future , io, net:: SocketAddr , str:: FromStr , sync:: Arc , time:: Duration } ;
3
+ use tokio:: net:: TcpListener ;
3
4
5
+ use bytes:: Bytes ;
4
6
use futures:: { channel:: oneshot, future:: select, FutureExt } ;
5
7
use futures_timer:: Delay ;
6
8
use http:: { header, StatusCode } ;
7
- use hyper:: {
8
- server:: conn:: AddrStream ,
9
- service:: { make_service_fn, service_fn} ,
10
- Body , Error , Request , Response , Server ,
9
+ use http_body_util:: { combinators:: BoxBody , BodyExt , Empty } ;
10
+ use hyper:: { body:: Incoming as Body , service:: service_fn, Error , Request , Response } ;
11
+ use hyper_util:: {
12
+ rt:: { TokioExecutor , TokioIo } ,
13
+ server:: conn:: auto:: Builder as HyperBuilder ,
11
14
} ;
12
15
use parking_lot:: Mutex ;
13
16
use url:: Url ;
14
17
15
- async fn echo_route ( req : Request < Body > ) -> Response < Body > {
18
+ async fn echo_route ( req : Request < Body > ) -> Response < BoxBody < Bytes , hyper :: Error > > {
16
19
let headers = req. headers ( ) ;
17
20
let content_type = headers
18
21
. get ( header:: CONTENT_TYPE )
@@ -36,24 +39,24 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
36
39
if echo. is_some ( ) {
37
40
debug ! ( "Echo Body = {}" , echo. clone( ) . unwrap_or_default( ) ) ;
38
41
}
39
- let mut response = match ( req. method ( ) , echo) {
42
+ let mut response: Response < BoxBody < Bytes , Error > > = match ( req. method ( ) , echo) {
40
43
( & http:: Method :: GET , Some ( b) ) => Response :: builder ( )
41
44
. status ( StatusCode :: OK )
42
45
. header ( header:: CONTENT_TYPE , content_type)
43
- . body ( b. into ( ) )
46
+ . body ( b. map_err ( |never| match never { } ) . boxed ( ) )
44
47
. unwrap ( ) ,
45
48
( & http:: Method :: POST , _) | ( & http:: Method :: PUT , _) => Response :: builder ( )
46
49
. status ( StatusCode :: OK )
47
50
. header ( header:: CONTENT_TYPE , content_type)
48
- . body ( req. into_body ( ) )
51
+ . body ( req. into_body ( ) . boxed ( ) )
49
52
. unwrap ( ) ,
50
53
_ => Response :: builder ( )
51
54
. status ( StatusCode :: NO_CONTENT )
52
- . body ( Body :: empty ( ) )
55
+ . body ( empty ( ) )
53
56
. unwrap ( ) ,
54
57
} ;
55
58
let ms = wait. and_then ( |c| FromStr :: from_str ( & c) . ok ( ) ) . unwrap_or ( 0 ) ;
56
- let old_body = std:: mem:: replace ( response. body_mut ( ) , Body :: empty ( ) ) ;
59
+ let old_body = std:: mem:: replace ( response. body_mut ( ) , empty ( ) ) ;
57
60
if ms > 0 {
58
61
debug ! ( "waiting {} ms" , ms) ;
59
62
}
@@ -62,13 +65,22 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
62
65
response
63
66
}
64
67
65
- pub fn start_test_server (
68
+ fn empty ( ) -> BoxBody < Bytes , hyper:: Error > {
69
+ Empty :: < Bytes > :: new ( )
70
+ . map_err ( |never| match never { } )
71
+ . boxed ( )
72
+ }
73
+
74
+ pub async fn start_test_server (
66
75
port : Option < u16 > ,
67
76
) -> ( u16 , oneshot:: Sender < ( ) > , impl Future < Output = ( ) > ) {
68
77
let port = port. unwrap_or ( 0 ) ;
69
- let address = ( [ 127 , 0 , 0 , 1 ] , port) . into ( ) ;
78
+ let address: SocketAddr = ( [ 127 , 0 , 0 , 1 ] , port) . into ( ) ;
79
+
80
+ let listener = TcpListener :: bind ( address) . await . unwrap ( ) ;
81
+ let local_addr = listener. local_addr ( ) . unwrap ( ) ;
70
82
71
- let make_svc = make_service_fn ( |_ : & AddrStream | async {
83
+ let server = tokio :: spawn ( async move {
72
84
let service = service_fn ( |req : Request < Body > | async {
73
85
debug ! ( "{:?}" , req) ;
74
86
let method = req. method ( ) . to_string ( ) ;
@@ -78,7 +90,7 @@ pub fn start_test_server(
78
90
"/" => echo_route ( req) . await ,
79
91
_ => Response :: builder ( )
80
92
. status ( StatusCode :: NOT_FOUND )
81
- . body ( Body :: empty ( ) )
93
+ . body ( empty ( ) )
82
94
. unwrap ( ) ,
83
95
} ;
84
96
debug ! ( "{:?}" , response) ;
@@ -92,14 +104,20 @@ pub fn start_test_server(
92
104
) ;
93
105
Ok :: < _ , Error > ( response)
94
106
} ) ;
95
- Ok :: < _ , Error > ( service)
107
+
108
+ loop {
109
+ let ( stream, _) = listener. accept ( ) . await . unwrap ( ) ;
110
+ let stream = TokioIo :: new ( stream) ;
111
+ tokio:: task:: spawn ( async move {
112
+ let builder = HyperBuilder :: new ( TokioExecutor :: new ( ) ) ;
113
+ builder. serve_connection ( stream, service) . await . unwrap ( ) ;
114
+ } ) ;
115
+ }
96
116
} ) ;
97
117
98
118
let ( tx, rx) = oneshot:: channel ( ) ;
99
119
100
- let server = Server :: bind ( & address) . serve ( make_svc) ;
101
-
102
- let port = server. local_addr ( ) . port ( ) ;
120
+ let port = local_addr. port ( ) ;
103
121
104
122
let future = select ( server, rx) ;
105
123
0 commit comments