Skip to content

Commit db934f0

Browse files
committed
XHTTP client: Merge Open* into OpenStream(), and more
#4148 (comment)
1 parent 53b04d5 commit db934f0

File tree

6 files changed

+61
-226
lines changed

6 files changed

+61
-226
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ require (
1919
github.com/stretchr/testify v1.10.0
2020
github.com/v2fly/ss-bloomring v0.0.0-20210312155135-28617310f63e
2121
github.com/vishvananda/netlink v1.3.0
22-
github.com/xtls/quic-go v0.46.0
22+
github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087
2323
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d
2424
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
2525
golang.org/x/crypto v0.31.0

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQ
6868
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
6969
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
7070
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
71-
github.com/xtls/quic-go v0.46.0 h1:yfv6h+/+iOeFhFnmJiwlZgnJjr4fPb4N4rQelffbs1U=
72-
github.com/xtls/quic-go v0.46.0/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY=
71+
github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087 h1:kKPg/cJPSKnE50VXVBskDYYSBkl4X3sMCIbTy+XKNGk=
72+
github.com/xtls/quic-go v0.0.0-20241220091641-6f5777d1c087/go.mod h1:mN9lAuc8Vt7eHvnQkDIH5+uHh+DcLmTBma9rLqk/rPY=
7373
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d h1:+B97uD9uHLgAAulhigmys4BVwZZypzK7gPN3WtpgRJg=
7474
github.com/xtls/reality v0.0.0-20240712055506-48f0b2d5ed6d/go.mod h1:dm4y/1QwzjGaK17ofi0Vs6NpKAHegZky8qk6J2JJZAE=
7575
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=

transport/internet/splithttp/browser_client.go

+7-11
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@ func (c *BrowserDialerClient) IsClosed() bool {
1717
panic("not implemented yet")
1818
}
1919

20-
func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
21-
panic("not implemented yet")
22-
}
23-
24-
func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
25-
panic("not implemented yet")
26-
}
20+
func (c *BrowserDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
21+
if body != nil {
22+
panic("not implemented yet")
23+
}
2724

28-
func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
29-
conn, err := browser_dialer.DialGet(baseURL)
25+
conn, err := browser_dialer.DialGet(url)
3026
dummyAddr := &gonet.IPAddr{}
3127
if err != nil {
3228
return nil, dummyAddr, dummyAddr, err
@@ -35,8 +31,8 @@ func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string)
3531
return websocket.NewConnection(conn, dummyAddr, nil, 0), conn.RemoteAddr(), conn.LocalAddr(), nil
3632
}
3733

38-
func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
39-
bytes, err := io.ReadAll(payload)
34+
func (c *BrowserDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error {
35+
bytes, err := io.ReadAll(body)
4036
if err != nil {
4137
return err
4238
}

transport/internet/splithttp/client.go

+36-136
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,11 @@ import (
2020
type DialerClient interface {
2121
IsClosed() bool
2222

23-
// (ctx, baseURL, payload) -> err
24-
// baseURL already contains sessionId and seq
25-
SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
23+
// ctx, url, body, uploadOnly
24+
OpenStream(context.Context, string, io.Reader, bool) (io.ReadCloser, net.Addr, net.Addr, error)
2625

27-
// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
28-
// baseURL already contains sessionId
29-
OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)
30-
31-
// (ctx, baseURL) -> uploadWriter
32-
// baseURL already contains sessionId
33-
OpenUpload(context.Context, string) io.WriteCloser
34-
35-
// (ctx, pureURL) -> (uploadWriter, downloadReader)
36-
// pureURL can not contain sessionId
37-
Open(context.Context, string) (io.WriteCloser, io.ReadCloser)
26+
// ctx, url, body, contentLength
27+
PostPacket(context.Context, string, io.Reader, int64) error
3828
}
3929

4030
// implements splithttp.DialerClient in terms of direct network connections
@@ -52,136 +42,56 @@ func (c *DefaultDialerClient) IsClosed() bool {
5242
return c.closed
5343
}
5444

55-
func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
56-
reader, writer := io.Pipe()
57-
req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
58-
req.Header = c.transportConfig.GetRequestHeader()
59-
if !c.transportConfig.NoGRPCHeader {
60-
req.Header.Set("Content-Type", "application/grpc")
61-
}
62-
wrc := &WaitReadCloser{Wait: make(chan struct{})}
63-
go func() {
64-
response, err := c.client.Do(req)
65-
if err != nil || response.StatusCode != 200 {
66-
if err != nil {
67-
errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
68-
} else {
69-
// c.closed = true
70-
response.Body.Close()
71-
errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
72-
}
73-
wrc.Close()
74-
return
75-
}
76-
wrc.Set(response.Body)
77-
}()
78-
return writer, wrc
79-
}
80-
81-
func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
82-
reader, writer := io.Pipe()
83-
req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
84-
req.Header = c.transportConfig.GetRequestHeader()
85-
if !c.transportConfig.NoGRPCHeader {
86-
req.Header.Set("Content-Type", "application/grpc")
87-
}
88-
go func() {
89-
if resp, err := c.client.Do(req); err == nil {
90-
if resp.StatusCode != 200 {
91-
// c.closed = true
92-
}
93-
resp.Body.Close()
94-
}
95-
}()
96-
return writer
97-
}
98-
99-
func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
100-
var remoteAddr gonet.Addr
101-
var localAddr gonet.Addr
45+
func (c *DefaultDialerClient) OpenStream(ctx context.Context, url string, body io.Reader, uploadOnly bool) (wrc io.ReadCloser, remoteAddr, localAddr gonet.Addr, err error) {
10246
// this is done when the TCP/UDP connection to the server was established,
10347
// and we can unblock the Dial function and print correct net addresses in
10448
// logs
10549
gotConn := done.New()
50+
ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
51+
GotConn: func(connInfo httptrace.GotConnInfo) {
52+
remoteAddr = connInfo.Conn.RemoteAddr()
53+
localAddr = connInfo.Conn.LocalAddr()
54+
gotConn.Close()
55+
},
56+
})
10657

107-
var downResponse io.ReadCloser
108-
gotDownResponse := done.New()
109-
110-
ctx, ctxCancel := context.WithCancel(ctx)
58+
method := "GET"
59+
if body != nil {
60+
method = "POST"
61+
}
62+
req, _ := http.NewRequestWithContext(ctx, method, url, body)
63+
req.Header = c.transportConfig.GetRequestHeader()
64+
if method == "POST" && !c.transportConfig.NoGRPCHeader {
65+
req.Header.Set("Content-Type", "application/grpc")
66+
}
11167

68+
wrc = &WaitReadCloser{Wait: make(chan struct{})}
11269
go func() {
113-
trace := &httptrace.ClientTrace{
114-
GotConn: func(connInfo httptrace.GotConnInfo) {
115-
remoteAddr = connInfo.Conn.RemoteAddr()
116-
localAddr = connInfo.Conn.LocalAddr()
117-
gotConn.Close()
118-
},
119-
}
120-
121-
// in case we hit an error, we want to unblock this part
122-
defer gotConn.Close()
123-
124-
ctx = httptrace.WithClientTrace(ctx, trace)
125-
126-
req, err := http.NewRequestWithContext(
127-
ctx,
128-
"GET",
129-
baseURL,
130-
nil,
131-
)
132-
if err != nil {
133-
errors.LogInfoInner(ctx, err, "failed to construct download http request")
134-
gotDownResponse.Close()
135-
return
136-
}
137-
138-
req.Header = c.transportConfig.GetRequestHeader()
139-
140-
response, err := c.client.Do(req)
141-
gotConn.Close()
70+
resp, err := c.client.Do(req)
14271
if err != nil {
143-
errors.LogInfoInner(ctx, err, "failed to send download http request")
144-
gotDownResponse.Close()
72+
errors.LogInfoInner(ctx, err, "failed to "+method+" "+url)
73+
gotConn.Close()
74+
wrc.Close()
14575
return
14676
}
147-
148-
if response.StatusCode != 200 {
77+
if resp.StatusCode != 200 && !uploadOnly {
14978
// c.closed = true
150-
response.Body.Close()
151-
errors.LogInfo(ctx, "invalid status code on download:", response.Status)
152-
gotDownResponse.Close()
79+
errors.LogInfo(ctx, "unexpected status ", resp.StatusCode)
80+
}
81+
if resp.StatusCode != 200 || uploadOnly {
82+
resp.Body.Close()
83+
wrc.Close()
15384
return
15485
}
155-
156-
downResponse = response.Body
157-
gotDownResponse.Close()
86+
wrc.(*WaitReadCloser).Set(resp.Body)
15887
}()
15988

16089
<-gotConn.Wait()
161-
162-
lazyDownload := &LazyReader{
163-
CreateReader: func() (io.Reader, error) {
164-
<-gotDownResponse.Wait()
165-
if downResponse == nil {
166-
return nil, errors.New("downResponse failed")
167-
}
168-
return downResponse, nil
169-
},
170-
}
171-
172-
// workaround for https://github.com/quic-go/quic-go/issues/2143 --
173-
// always cancel request context so that Close cancels any Read.
174-
// Should then match the behavior of http2 and http1.
175-
reader := downloadBody{
176-
lazyDownload,
177-
ctxCancel,
178-
}
179-
180-
return reader, remoteAddr, localAddr, nil
90+
return
18191
}
18292

183-
func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
184-
req, err := http.NewRequestWithContext(ctx, "POST", url, payload)
93+
func (c *DefaultDialerClient) PostPacket(ctx context.Context, url string, body io.Reader, contentLength int64) error {
94+
req, err := http.NewRequestWithContext(ctx, "POST", url, body)
18595
if err != nil {
18696
return err
18797
}
@@ -257,16 +167,6 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
257167
return nil
258168
}
259169

260-
type downloadBody struct {
261-
io.Reader
262-
cancel context.CancelFunc
263-
}
264-
265-
func (c downloadBody) Close() error {
266-
c.cancel()
267-
return nil
268-
}
269-
270170
type WaitReadCloser struct {
271171
Wait chan struct{}
272172
io.ReadCloser

transport/internet/splithttp/dialer.go

+15-29
Original file line numberDiff line numberDiff line change
@@ -343,29 +343,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
343343
errors.LogInfo(ctx, fmt.Sprintf("XHTTP is downloading from %s, mode %s, HTTP version %s, host %s", dest2, "stream-down", httpVersion2, requestURL2.Host))
344344
}
345345

346-
var writer io.WriteCloser
347-
var reader io.ReadCloser
348-
var remoteAddr, localAddr net.Addr
349-
var err error
350-
351-
if mode == "stream-one" {
352-
requestURL.Path = transportConfiguration.GetNormalizedPath()
353-
if xmuxClient != nil {
354-
xmuxClient.LeftRequests.Add(-1)
355-
}
356-
writer, reader = httpClient.Open(context.WithoutCancel(ctx), requestURL.String())
357-
remoteAddr = &net.TCPAddr{}
358-
localAddr = &net.TCPAddr{}
359-
} else {
360-
if xmuxClient2 != nil {
361-
xmuxClient2.LeftRequests.Add(-1)
362-
}
363-
reader, remoteAddr, localAddr, err = httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
364-
if err != nil {
365-
return nil, err
366-
}
367-
}
368-
369346
if xmuxClient != nil {
370347
xmuxClient.OpenUsage.Add(1)
371348
}
@@ -374,11 +351,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
374351
}
375352
var closed atomic.Int32
376353

354+
reader, writer := io.Pipe()
377355
conn := splitConn{
378-
writer: writer,
379-
reader: reader,
380-
remoteAddr: remoteAddr,
381-
localAddr: localAddr,
356+
writer: writer,
382357
onClose: func() {
383358
if closed.Add(1) > 1 {
384359
return
@@ -393,16 +368,27 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
393368
}
394369

395370
if mode == "stream-one" {
371+
requestURL.Path = transportConfiguration.GetNormalizedPath()
396372
if xmuxClient != nil {
397373
xmuxClient.LeftRequests.Add(-1)
398374
}
375+
conn.reader, conn.remoteAddr, conn.localAddr, _ = httpClient.OpenStream(context.WithoutCancel(ctx), requestURL.String(), reader, false)
399376
return stat.Connection(&conn), nil
377+
} else { // stream-down
378+
var err error
379+
if xmuxClient2 != nil {
380+
xmuxClient2.LeftRequests.Add(-1)
381+
}
382+
conn.reader, conn.remoteAddr, conn.localAddr, err = httpClient2.OpenStream(context.WithoutCancel(ctx), requestURL2.String(), nil, false)
383+
if err != nil { // browser dialer only
384+
return nil, err
385+
}
400386
}
401387
if mode == "stream-up" {
402388
if xmuxClient != nil {
403389
xmuxClient.LeftRequests.Add(-1)
404390
}
405-
conn.writer = httpClient.OpenUpload(ctx, requestURL.String())
391+
httpClient.OpenStream(ctx, requestURL.String(), reader, true)
406392
return stat.Connection(&conn), nil
407393
}
408394

@@ -466,7 +452,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
466452
}
467453

468454
go func() {
469-
err := httpClient.SendUploadRequest(
455+
err := httpClient.PostPacket(
470456
context.WithoutCancel(ctx),
471457
url.String(),
472458
&buf.MultiBufferContainer{MultiBuffer: chunk},

transport/internet/splithttp/lazy_reader.go

-47
This file was deleted.

0 commit comments

Comments
 (0)