Skip to content

Commit 129b2be

Browse files
committed
Use capsulate protocol for large UDP packet
- make datagram transport without mux functionality - it is now recommended to always pair with mux-cool (XUDP new tunnel non-zero session id)
1 parent 358bdc2 commit 129b2be

File tree

4 files changed

+140
-89
lines changed

4 files changed

+140
-89
lines changed

transport/internet/quic/conn.go

+122-14
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,83 @@ import (
66

77
"github.com/quic-go/quic-go"
88
"github.com/xtls/xray-core/common/buf"
9+
"github.com/xtls/xray-core/common/errors"
910
"github.com/xtls/xray-core/common/net"
11+
"github.com/xtls/xray-core/common/signal/done"
1012
)
1113

14+
var MaxIncomingStreams = 2
15+
var currentStream = 0
16+
1217
type interConn struct {
13-
ctx context.Context
14-
quicConn quic.Connection
15-
local net.Addr
16-
remote net.Addr
18+
ctx context.Context
19+
quicConn quic.Connection // small udp packet can be sent with Datagram directly
20+
streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering
21+
readChannel chan readResult
22+
done *done.Instance
23+
local net.Addr
24+
remote net.Addr
25+
}
26+
27+
type readResult struct {
28+
buffer []byte
29+
err error
30+
}
31+
32+
func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done.Instance, remote net.Addr) *interConn {
33+
c := &interConn{
34+
ctx: ctx,
35+
quicConn: quicConn,
36+
readChannel: make(chan readResult),
37+
done: done,
38+
local: quicConn.LocalAddr(),
39+
remote: remote,
40+
}
41+
go func() {
42+
for {
43+
received, e := c.quicConn.ReceiveDatagram(c.ctx)
44+
c.readChannel <- readResult{buffer: received, err: e}
45+
}
46+
}()
47+
go c.acceptStreams()
48+
return c
1749
}
1850

19-
func (c *interConn) Read(b []byte) (int, error) {
20-
received, e := c.quicConn.ReceiveDatagram(c.ctx)
21-
if e != nil {
22-
return 0, e
51+
func (c *interConn) acceptStreams() {
52+
for {
53+
stream, err := c.quicConn.AcceptStream(context.Background())
54+
if err != nil {
55+
errors.LogInfoInner(context.Background(), err, "failed to accept stream")
56+
select {
57+
case <-c.quicConn.Context().Done():
58+
return
59+
case <-c.done.Wait():
60+
if err := c.quicConn.CloseWithError(0, ""); err != nil {
61+
errors.LogInfoInner(context.Background(), err, "failed to close connection")
62+
}
63+
return
64+
default:
65+
time.Sleep(time.Second)
66+
continue
67+
}
68+
}
69+
go func() {
70+
for {
71+
received := make([]byte, buf.Size)
72+
i, e := stream.Read(received)
73+
c.readChannel <- readResult{buffer: received[:i], err: e}
74+
}
75+
}()
76+
c.streams = append(c.streams, stream)
2377
}
24-
nBytes := copy(b, received[:])
78+
}
79+
80+
func (c *interConn) Read(b []byte) (int, error) {
81+
received := <- c.readChannel
82+
if received.err != nil {
83+
return 0, received.err
84+
}
85+
nBytes := copy(b, received.buffer[:])
2586
return nBytes, nil
2687
}
2788

@@ -33,11 +94,37 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
3394
}
3495

3596
func (c *interConn) Write(b []byte) (int, error) {
36-
return len(b), c.quicConn.SendDatagram(b)
97+
var err = c.quicConn.SendDatagram(b)
98+
if _, ok := err.(*quic.DatagramTooLargeError); ok {
99+
if len(c.streams) < MaxIncomingStreams {
100+
stream, err := c.quicConn.OpenStream()
101+
if err == nil {
102+
c.streams = append(c.streams, stream)
103+
} else {
104+
errors.LogInfoInner(c.ctx, err, "failed to openStream: ")
105+
}
106+
}
107+
currentStream++;
108+
if currentStream > len(c.streams) - 1 {
109+
currentStream = 0;
110+
}
111+
return c.streams[currentStream].Write(b)
112+
}
113+
if err != nil {
114+
return 0, err
115+
}
116+
return len(b), nil
37117
}
38118

39119
func (c *interConn) Close() error {
40-
return nil
120+
var err error
121+
for _, s := range c.streams {
122+
e := s.Close()
123+
if e != nil {
124+
err = e
125+
}
126+
}
127+
return err
41128
}
42129

43130
func (c *interConn) LocalAddr() net.Addr {
@@ -49,13 +136,34 @@ func (c *interConn) RemoteAddr() net.Addr {
49136
}
50137

51138
func (c *interConn) SetDeadline(t time.Time) error {
52-
return nil
139+
var err error
140+
for _, s := range c.streams {
141+
e := s.SetDeadline(t)
142+
if e != nil {
143+
err = e
144+
}
145+
}
146+
return err
53147
}
54148

55149
func (c *interConn) SetReadDeadline(t time.Time) error {
56-
return nil
150+
var err error
151+
for _, s := range c.streams {
152+
e := s.SetReadDeadline(t)
153+
if e != nil {
154+
err = e
155+
}
156+
}
157+
return err
57158
}
58159

59160
func (c *interConn) SetWriteDeadline(t time.Time) error {
60-
return nil
161+
var err error
162+
for _, s := range c.streams {
163+
e := s.SetWriteDeadline(t)
164+
if e != nil {
165+
err = e
166+
}
167+
}
168+
return err
61169
}

transport/internet/quic/dialer.go

+4-63
Original file line numberDiff line numberDiff line change
@@ -2,38 +2,18 @@ package quic
22

33
import (
44
"context"
5-
"sync"
65
"time"
76

87
"github.com/quic-go/quic-go"
98
"github.com/xtls/xray-core/common"
109
"github.com/xtls/xray-core/common/errors"
1110
"github.com/xtls/xray-core/common/net"
11+
"github.com/xtls/xray-core/common/signal/done"
1212
"github.com/xtls/xray-core/transport/internet"
1313
"github.com/xtls/xray-core/transport/internet/stat"
1414
"github.com/xtls/xray-core/transport/internet/tls"
1515
)
1616

17-
type connectionContext struct {
18-
rawConn *net.UDPConn
19-
conn quic.Connection
20-
}
21-
22-
type clientConnections struct {
23-
access sync.Mutex
24-
conns map[net.Destination][]*connectionContext
25-
// cleanup *task.Periodic
26-
}
27-
28-
func isActive(s quic.Connection) bool {
29-
select {
30-
case <-s.Context().Done():
31-
return false
32-
default:
33-
return true
34-
}
35-
}
36-
3717
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
3818
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
3919
if tlsConfig == nil {
@@ -68,38 +48,11 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
6848

6949
config := streamSettings.ProtocolSettings.(*Config)
7050

71-
return client.openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings)
51+
return openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings)
7252
}
7353

74-
func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
75-
s.access.Lock()
76-
defer s.access.Unlock()
77-
78-
if s.conns == nil {
79-
s.conns = make(map[net.Destination][]*connectionContext)
80-
}
81-
54+
func openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
8255
dest := net.DestinationFromAddr(destAddr)
83-
84-
var conns []*connectionContext
85-
if s, found := s.conns[dest]; found {
86-
conns = s
87-
}
88-
89-
if len(conns) > 0 {
90-
s := conns[len(conns)-1]
91-
if isActive(s.conn) {
92-
return &interConn{
93-
ctx: ctx,
94-
quicConn: s.conn,
95-
local: s.conn.LocalAddr(),
96-
remote: destAddr,
97-
}, nil
98-
} else {
99-
errors.LogInfo(ctx, "current quic connection is not active!")
100-
}
101-
}
102-
10356
errors.LogInfo(ctx, "dialing quic to ", dest)
10457
rawConn, err := internet.DialSystem(ctx, dest, sockopt)
10558
if err != nil {
@@ -134,21 +87,9 @@ func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Add
13487
return nil, err
13588
}
13689

137-
context := &connectionContext{
138-
conn: conn,
139-
rawConn: udpConn,
140-
}
141-
s.conns[dest] = append(conns, context)
142-
return &interConn{
143-
ctx: ctx,
144-
quicConn: context.conn,
145-
local: context.conn.LocalAddr(),
146-
remote: destAddr,
147-
}, nil
90+
return NewConnInitReader(ctx, conn, done.New(), destAddr), nil
14891
}
14992

150-
var client clientConnections
151-
15293
func init() {
15394
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
15495
}

transport/internet/quic/hub.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@ func (l *Listener) keepAccepting(ctx context.Context) {
3333
time.Sleep(time.Second)
3434
continue
3535
}
36-
l.addConn(&interConn{
37-
ctx: ctx,
38-
quicConn: conn,
39-
local: conn.LocalAddr(),
40-
remote: conn.RemoteAddr(),
41-
})
36+
l.addConn(NewConnInitReader(ctx, conn, l.done, conn.RemoteAddr()))
4237
}
4338
}
4439

@@ -81,7 +76,7 @@ func Listen(ctx context.Context, address net.Address, port net.Port, streamSetti
8176
KeepAlivePeriod: 0,
8277
HandshakeIdleTimeout: time.Second * 8,
8378
MaxIdleTimeout: time.Second * 300,
84-
MaxIncomingStreams: 32,
79+
MaxIncomingStreams: 2,
8580
MaxIncomingUniStreams: -1,
8681
EnableDatagrams: true,
8782
}

transport/internet/quic/quic_test.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,15 @@ import (
1818
"github.com/xtls/xray-core/transport/internet/tls"
1919
)
2020

21-
func TestQuicConnection(t *testing.T) {
21+
func TestShortQuicConnection(t *testing.T) {
22+
testQuicConnection(t, 1024)
23+
}
24+
25+
func TestLongQuicConnection(t *testing.T) {
26+
testQuicConnection(t, 1500)
27+
}
28+
29+
func testQuicConnection(t *testing.T, dataLen int32) {
2230
port := udp.PickPort()
2331

2432
listener, err := quic.Listen(context.Background(), net.LocalHostIP, port, &internet.MemoryStreamConfig{
@@ -69,23 +77,22 @@ func TestQuicConnection(t *testing.T) {
6977
common.Must(err)
7078
defer conn.Close()
7179

72-
const N = 1024
73-
b1 := make([]byte, N)
80+
b1 := make([]byte, dataLen)
7481
common.Must2(rand.Read(b1))
7582
b2 := buf.New()
7683

7784
common.Must2(conn.Write(b1))
7885

7986
b2.Clear()
80-
common.Must2(b2.ReadFullFrom(conn, N))
87+
common.Must2(b2.ReadFullFrom(conn, dataLen))
8188
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
8289
t.Error(r)
8390
}
8491

8592
common.Must2(conn.Write(b1))
8693

8794
b2.Clear()
88-
common.Must2(b2.ReadFullFrom(conn, N))
95+
common.Must2(b2.ReadFullFrom(conn, dataLen))
8996
if r := cmp.Diff(b2.Bytes(), b1); r != "" {
9097
t.Error(r)
9198
}

0 commit comments

Comments
 (0)