Skip to content

Commit f2ffcc7

Browse files
authored
Merge pull request #28 from whyrusleeping/feat/deadline
Make deadlines interrupt and coalesce writes
2 parents 5364a42 + 5936fdb commit f2ffcc7

12 files changed

+550
-562
lines changed

LICENSE-BSD

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
Copyright (c) 2009 The Go Authors. All rights reserved.
2+
3+
Redistribution and use in source and binary forms, with or without
4+
modification, are permitted provided that the following conditions are
5+
met:
6+
7+
* Redistributions of source code must retain the above copyright
8+
notice, this list of conditions and the following disclaimer.
9+
* Redistributions in binary form must reproduce the above
10+
copyright notice, this list of conditions and the following disclaimer
11+
in the documentation and/or other materials provided with the
12+
distribution.
13+
* Neither the name of Google Inc. nor the names of its
14+
contributors may be used to endorse or promote products derived from
15+
this software without specific prior written permission.
16+
17+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18+
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19+
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20+
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21+
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22+
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23+
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24+
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25+
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26+
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

bench_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package yamux
22

33
import (
4+
"io"
45
"testing"
56
)
67

@@ -60,7 +61,7 @@ func BenchmarkSendRecv(b *testing.B) {
6061
}
6162
defer stream.Close()
6263
for i := 0; i < b.N; i++ {
63-
if _, err := stream.Read(recvBuf); err != nil {
64+
if _, err := io.ReadFull(stream, recvBuf); err != nil {
6465
b.Fatalf("err: %v", err)
6566
}
6667
}
@@ -101,7 +102,7 @@ func BenchmarkSendRecvLarge(b *testing.B) {
101102
defer stream.Close()
102103
for i := 0; i < b.N; i++ {
103104
for j := 0; j < sendSize/recvSize; j++ {
104-
if _, err := stream.Read(recvBuf); err != nil {
105+
if _, err := io.ReadFull(stream, recvBuf); err != nil {
105106
b.Fatalf("err: %v", err)
106107
}
107108
}

const.go

+27-16
Original file line numberDiff line numberDiff line change
@@ -6,59 +6,68 @@ import (
66
)
77

88
type YamuxError struct {
9-
msg string
9+
msg string
10+
timeout, temporary bool
1011
}
1112

1213
func (ye YamuxError) Error() string {
1314
return ye.msg
1415
}
1516

17+
func (ye YamuxError) Timeout() bool {
18+
return ye.timeout
19+
}
20+
21+
func (ye YamuxError) Temporary() bool {
22+
return ye.temporary
23+
}
24+
1625
var (
1726
// ErrInvalidVersion means we received a frame with an
1827
// invalid version
19-
ErrInvalidVersion = &YamuxError{"invalid protocol version"}
28+
ErrInvalidVersion = &YamuxError{msg: "invalid protocol version"}
2029

2130
// ErrInvalidMsgType means we received a frame with an
2231
// invalid message type
23-
ErrInvalidMsgType = &YamuxError{"invalid msg type"}
32+
ErrInvalidMsgType = &YamuxError{msg: "invalid msg type"}
2433

2534
// ErrSessionShutdown is used if there is a shutdown during
2635
// an operation
27-
ErrSessionShutdown = &YamuxError{"session shutdown"}
36+
ErrSessionShutdown = &YamuxError{msg: "session shutdown"}
2837

2938
// ErrStreamsExhausted is returned if we have no more
3039
// stream ids to issue
31-
ErrStreamsExhausted = &YamuxError{"streams exhausted"}
40+
ErrStreamsExhausted = &YamuxError{msg: "streams exhausted"}
3241

3342
// ErrDuplicateStream is used if a duplicate stream is
3443
// opened inbound
35-
ErrDuplicateStream = &YamuxError{"duplicate stream initiated"}
44+
ErrDuplicateStream = &YamuxError{msg: "duplicate stream initiated"}
3645

3746
// ErrReceiveWindowExceeded indicates the window was exceeded
38-
ErrRecvWindowExceeded = &YamuxError{"recv window exceeded"}
47+
ErrRecvWindowExceeded = &YamuxError{msg: "recv window exceeded"}
3948

4049
// ErrTimeout is used when we reach an IO deadline
41-
ErrTimeout = &YamuxError{"i/o deadline reached"}
50+
ErrTimeout = &YamuxError{msg: "i/o deadline reached", timeout: true, temporary: true}
4251

4352
// ErrStreamClosed is returned when using a closed stream
44-
ErrStreamClosed = &YamuxError{"stream closed"}
53+
ErrStreamClosed = &YamuxError{msg: "stream closed"}
4554

4655
// ErrUnexpectedFlag is set when we get an unexpected flag
47-
ErrUnexpectedFlag = &YamuxError{"unexpected flag"}
56+
ErrUnexpectedFlag = &YamuxError{msg: "unexpected flag"}
4857

4958
// ErrRemoteGoAway is used when we get a go away from the other side
50-
ErrRemoteGoAway = &YamuxError{"remote end is not accepting connections"}
59+
ErrRemoteGoAway = &YamuxError{msg: "remote end is not accepting connections"}
5160

5261
// ErrConnectionReset is sent if a stream is reset. This can happen
5362
// if the backlog is exceeded, or if there was a remote GoAway.
54-
ErrConnectionReset = &YamuxError{"stream reset"}
63+
ErrConnectionReset = &YamuxError{msg: "stream reset"}
5564

5665
// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
5766
// timeout writing to the underlying stream connection.
58-
ErrConnectionWriteTimeout = &YamuxError{"connection write timeout"}
67+
ErrConnectionWriteTimeout = &YamuxError{msg: "connection write timeout", timeout: true}
5968

6069
// ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
61-
ErrKeepAliveTimeout = &YamuxError{"keepalive timeout"}
70+
ErrKeepAliveTimeout = &YamuxError{msg: "keepalive timeout", timeout: true}
6271
)
6372

6473
const (
@@ -129,7 +138,7 @@ const (
129138
sizeOfStreamID + sizeOfLength
130139
)
131140

132-
type header []byte
141+
type header [headerSize]byte
133142

134143
func (h header) Version() uint8 {
135144
return h[0]
@@ -156,10 +165,12 @@ func (h header) String() string {
156165
h.Version(), h.MsgType(), h.Flags(), h.StreamID(), h.Length())
157166
}
158167

159-
func (h header) encode(msgType uint8, flags uint16, streamID uint32, length uint32) {
168+
func encode(msgType uint8, flags uint16, streamID uint32, length uint32) header {
169+
var h header
160170
h[0] = protoVersion
161171
h[1] = msgType
162172
binary.BigEndian.PutUint16(h[2:4], flags)
163173
binary.BigEndian.PutUint32(h[4:8], streamID)
164174
binary.BigEndian.PutUint32(h[8:12], length)
175+
return h
165176
}

const_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ func TestConst(t *testing.T) {
5151
}
5252

5353
func TestEncodeDecode(t *testing.T) {
54-
hdr := header(make([]byte, headerSize))
55-
hdr.encode(typeWindowUpdate, flagACK|flagRST, 1234, 4321)
54+
hdr := encode(typeWindowUpdate, flagACK|flagRST, 1234, 4321)
5655

5756
if hdr.Version() != protoVersion {
5857
t.Fatalf("bad: %v", hdr)

deadline.go

+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copied from the go standard library.
2+
//
3+
// Copyright 2010 The Go Authors. All rights reserved.
4+
// Use of this source code is governed by a BSD-style
5+
// license that can be found in the LICENSE-BSD file.
6+
7+
package yamux
8+
9+
import (
10+
"sync"
11+
"time"
12+
)
13+
14+
// pipeDeadline is an abstraction for handling timeouts.
15+
type pipeDeadline struct {
16+
mu sync.Mutex // Guards timer and cancel
17+
timer *time.Timer
18+
cancel chan struct{} // Must be non-nil
19+
}
20+
21+
func makePipeDeadline() pipeDeadline {
22+
return pipeDeadline{cancel: make(chan struct{})}
23+
}
24+
25+
// set sets the point in time when the deadline will time out.
26+
// A timeout event is signaled by closing the channel returned by waiter.
27+
// Once a timeout has occurred, the deadline can be refreshed by specifying a
28+
// t value in the future.
29+
//
30+
// A zero value for t prevents timeout.
31+
func (d *pipeDeadline) set(t time.Time) {
32+
d.mu.Lock()
33+
defer d.mu.Unlock()
34+
35+
if d.timer != nil && !d.timer.Stop() {
36+
<-d.cancel // Wait for the timer callback to finish and close cancel
37+
}
38+
d.timer = nil
39+
40+
// Time is zero, then there is no deadline.
41+
closed := isClosedChan(d.cancel)
42+
if t.IsZero() {
43+
if closed {
44+
d.cancel = make(chan struct{})
45+
}
46+
return
47+
}
48+
49+
// Time in the future, setup a timer to cancel in the future.
50+
if dur := time.Until(t); dur > 0 {
51+
if closed {
52+
d.cancel = make(chan struct{})
53+
}
54+
d.timer = time.AfterFunc(dur, func() {
55+
close(d.cancel)
56+
})
57+
return
58+
}
59+
60+
// Time in the past, so close immediately.
61+
if !closed {
62+
close(d.cancel)
63+
}
64+
}
65+
66+
// wait returns a channel that is closed when the deadline is exceeded.
67+
func (d *pipeDeadline) wait() chan struct{} {
68+
d.mu.Lock()
69+
defer d.mu.Unlock()
70+
return d.cancel
71+
}
72+
73+
func isClosedChan(c <-chan struct{}) bool {
74+
select {
75+
case <-c:
76+
return true
77+
default:
78+
return false
79+
}
80+
}

go.mod

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module github.com/whyrusleeping/yamux
2+
3+
go 1.12
4+
5+
require github.com/libp2p/go-buffer-pool v0.0.1

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
github.com/libp2p/go-buffer-pool v0.0.1 h1:9Rrn/H46cXjaA2HQ5Y8lyhOS1NhTkZ4yuEs2r3Eechg=
2+
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=

mux.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package yamux
33
import (
44
"fmt"
55
"io"
6+
"net"
67
"os"
78
"time"
89
)
@@ -37,6 +38,15 @@ type Config struct {
3738
//
3839
// Set to 0 to disable it.
3940
ReadBufSize int
41+
42+
// WriteCoalesceDelay is the maximum amount of time we'll delay
43+
// coalescing a packet before sending it. This should be on the order of
44+
// micro-milliseconds.
45+
WriteCoalesceDelay time.Duration
46+
47+
// MaxMessageSize is the maximum size of a message that we'll send on a
48+
// stream. This ensures that a single stream doesn't hog a connection.
49+
MaxMessageSize uint32
4050
}
4151

4252
// DefaultConfig is used to return a default configuration
@@ -49,6 +59,8 @@ func DefaultConfig() *Config {
4959
MaxStreamWindowSize: initialStreamWindow,
5060
LogOutput: os.Stderr,
5161
ReadBufSize: 4096,
62+
MaxMessageSize: 64 * 1024, // Means 64KiB/10s = 52kbps minimum speed.
63+
WriteCoalesceDelay: 10 * time.Microsecond,
5264
}
5365
}
5466

@@ -63,13 +75,19 @@ func VerifyConfig(config *Config) error {
6375
if config.MaxStreamWindowSize < initialStreamWindow {
6476
return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
6577
}
78+
if config.MaxMessageSize < 1024 {
79+
return fmt.Errorf("MaxMessageSize must be greater than a kilobyte")
80+
}
81+
if config.WriteCoalesceDelay < 0 {
82+
return fmt.Errorf("WriteCoalesceDelay must be >= 0")
83+
}
6684
return nil
6785
}
6886

6987
// Server is used to initialize a new server-side connection.
7088
// There must be at most one server-side connection. If a nil config is
7189
// provided, the DefaultConfiguration will be used.
72-
func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) {
90+
func Server(conn net.Conn, config *Config) (*Session, error) {
7391
if config == nil {
7492
config = DefaultConfig()
7593
}
@@ -81,7 +99,7 @@ func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) {
8199

82100
// Client is used to initialize a new client-side connection.
83101
// There must be at most one client-side connection.
84-
func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) {
102+
func Client(conn net.Conn, config *Config) (*Session, error) {
85103
if config == nil {
86104
config = DefaultConfig()
87105
}

0 commit comments

Comments
 (0)