Skip to content

Commit 33acf3c

Browse files
committedMar 17, 2025·
Add multi buffer reader
1 parent 7c78408 commit 33acf3c

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed
 

‎transport/internet/quic/conn.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type interConn struct {
1919
quicConn quic.Connection // small udp packet can be sent with Datagram directly
2020
streams []quic.Stream // other packets can be sent via steam, it offer mux, reliability, fragmentation and ordering
2121
readChannel chan readResult
22+
reader buf.MultiBufferContainer
2223
done *done.Instance
2324
local net.Addr
2425
remote net.Addr
@@ -34,6 +35,7 @@ func NewConnInitReader(ctx context.Context, quicConn quic.Connection, done *done
3435
ctx: ctx,
3536
quicConn: quicConn,
3637
readChannel: make(chan readResult),
38+
reader: buf.MultiBufferContainer{},
3739
done: done,
3840
local: quicConn.LocalAddr(),
3941
remote: remote,
@@ -81,13 +83,18 @@ func (c *interConn) acceptStreams() {
8183
}
8284

8385
func (c *interConn) Read(b []byte) (int, error) {
86+
if c.reader.MultiBuffer.Len() > 0 {
87+
return c.reader.Read(b)
88+
}
8489
received := <- c.readChannel
8590
if received.err != nil {
8691
return 0, received.err
8792
}
88-
nBytes := copy(b, received.buffer[:])
89-
errors.LogInfo(c.ctx, "Read copy ", nBytes)
90-
return nBytes, nil
93+
buffer := buf.New()
94+
buffer.Write(received.buffer)
95+
c.reader.MultiBuffer = append(c.reader.MultiBuffer, buffer)
96+
errors.LogInfo(c.ctx, "Read copy ", len(received.buffer))
97+
return c.reader.Read(b)
9198
}
9299

93100
func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
@@ -98,6 +105,23 @@ func (c *interConn) WriteMultiBuffer(mb buf.MultiBuffer) error {
98105
}
99106

100107
func (c *interConn) Write(b []byte) (int, error) {
108+
if len(b) > 1240 { // TODO: why quic-go increase internal MTU causing packet loss?
109+
if len(c.streams) < MaxIncomingStreams {
110+
stream, err := c.quicConn.OpenStream()
111+
errors.LogInfo(c.ctx, "Write OpenStream ", err)
112+
if err == nil {
113+
c.streams = append(c.streams, stream)
114+
} else {
115+
errors.LogInfoInner(c.ctx, err, "failed to openStream: ")
116+
}
117+
}
118+
currentStream++;
119+
if currentStream > len(c.streams) - 1 {
120+
currentStream = 0;
121+
}
122+
errors.LogInfo(c.ctx, "Write stream ", len(b), currentStream, len(c.streams))
123+
return c.streams[currentStream].Write(b)
124+
}
101125
var err = c.quicConn.SendDatagram(b)
102126
errors.LogInfo(c.ctx, "Write SendDatagram ", len(b), err)
103127
if _, ok := err.(*quic.DatagramTooLargeError); ok {

‎transport/internet/quic/quic_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ func TestShortQuicConnection(t *testing.T) {
2222
testQuicConnection(t, 1024)
2323
}
2424

25+
func TestAroundMTUQuicConnection(t *testing.T) {
26+
testQuicConnection(t, 1247)
27+
}
28+
2529
func TestLongQuicConnection(t *testing.T) {
2630
testQuicConnection(t, 1500)
2731
}

0 commit comments

Comments
 (0)
Please sign in to comment.