-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make deadlines interrupt and coalesce writes #28
Conversation
These are really small so there's no point in using slices. These slices are currently amortized by reusing them but we can't _always_ do that.
Write always writes everything
Creating a new byte reader is a waste of an allocation.
1. Deadlines now apply to in-progress operations. 2. The ConnectionWriteTimeout now actually does what it says it does: When we fail to write fast enough, it kills the entire connection. 3. Make all writes async. a. We can't abort after a partial write anyways. b. The network is buffered so it doesn't matter. c. This will help us coalesce writes.
// | ||
// Copyright 2010 The Go Authors. All rights reserved. | ||
// Use of this source code is governed by a BSD-style | ||
// license that can be found in the LICENSE-BSD file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stolen from the standard library, no need to review much.
@@ -69,7 +70,7 @@ func VerifyConfig(config *Config) error { | |||
// Server is used to initialize a new server-side connection. | |||
// There must be at most one server-side connection. If a nil config is | |||
// provided, the DefaultConfiguration will be used. | |||
func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { | |||
func Server(conn net.Conn, config *Config) (*Session, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A breaking change for yamux but libp2p doesn't care.
@@ -169,7 +161,7 @@ func (s *Session) Open() (net.Conn, error) { | |||
// OpenStream is used to create a new stream | |||
func (s *Session) OpenStream() (*Stream, error) { | |||
if s.IsClosed() { | |||
return nil, ErrSessionShutdown | |||
return nil, s.shutdownErr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exposes the connection timeout error. I'm happy to revert this but a bunch of tests were complaining that Write
was returning ErrSessionShutdown
instead of ErrConnectionWriteTimeout
so I just fixed this everywhere.
@@ -275,14 +267,13 @@ func (s *Session) exitErr(err error) { | |||
// GoAway can be used to prevent accepting further | |||
// connections. It does not close the underlying conn. | |||
func (s *Session) GoAway() error { | |||
return s.waitForSend(s.goAway(goAwayNormal), nil) | |||
return s.sendMsg(s.goAway(goAwayNormal), nil, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is now only one way to send things (all sends are async, like the network, dammit!).
// duplicate as we're sending this async. | ||
buf := pool.Get(headerSize + len(body)) | ||
copy(buf[:headerSize], hdr[:]) | ||
copy(buf[headerSize:], body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the most annoying part of this patch for me. However:
- To implement per-stream deadlines correctly, we need to copy the data before writing. Otherwise, we can't return while the write is still in-progress (we don't want to just kill the entire connection).
- The buffer pool is actually pretty fast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's ok.
writer: write1, | ||
writeBlocker: make(chan struct{}, 1), | ||
closeCh: make(chan struct{}, 1), | ||
func testConn() (conn1, conn2 net.Conn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests made a lot of assumptions about the synchronous nature of this connection. I've had to rewrork most of them to handle the fact that all writes are now async.
@@ -360,6 +319,16 @@ func (s *Stream) forceClose() { | |||
} | |||
s.stateLock.Unlock() | |||
s.notifyWaiting() | |||
|
|||
s.readDeadline.set(time.Time{}) | |||
s.readDeadline.set(time.Time{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid leaking timers.
func (s *Stream) cleanup() { | ||
s.session.closeStream(s.id) | ||
s.readDeadline.set(time.Time{}) | ||
s.readDeadline.set(time.Time{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also avoid leaking timers.
defer s.stateLock.Unlock() | ||
switch s.state { | ||
case streamClosed, streamRemoteClose, streamReset: | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really, don't leak timers.
* Ensures one stream doesn't completely starve the others when writing a large blob. * Ensures we don't think we're stalling on write when we're actually writing a ton of data. * Ensures we don't queue more than 1MiB in the write channel.
1. Bump to 64KiB. That's large enough to not be a performance issue but still small enough that not writing 64KiB within the connection write timeout means something is very wrong. 2. Remove the header size from this max message size so we get a nice round number (see the next patch).
1. This allows us to combine the header and body before writing and helps us avoid really tiny packets. 2. We _need_ to copy as we're now sending entirely async. The buffer pool should pretty much erase the overhead of this copy.
cc @vyzo. |
It seems that a little delay does help, I would say 1ms is not a big deal and goes a long way towards the goal of reducing packet counts. |
1ms is a pretty big deal on a local network. I'd prefer to ship something simpler first and then play with configurable delays so we have a better base to test against. |
This significantly impacts local benchmarks but those are pretty synthetic anyways. This shouldn't make any difference on an actual network.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, a couple of nits.
@@ -0,0 +1,5 @@ | |||
module github.com/whyrusleeping/yamux | |||
|
|||
go 1.12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that was probably auto-added when you built, but I don't think we mandate it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't really make a difference. It just means go will warn the user if the compile fails.
The go directive in a go.mod file now indicates the version of the language used by the files within that module. It will be set to the current release (go 1.12) if no existing version is present. If the go directive for a module specifies a version newer than the toolchain in use, the go command will attempt to build the packages regardless, and will note the mismatch only if that build fails.
// duplicate as we're sending this async. | ||
buf := pool.Get(headerSize + len(body)) | ||
copy(buf[:headerSize], hdr[:]) | ||
copy(buf[headerSize:], body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's ok.
session.go
Outdated
} | ||
|
||
writer = getBuffer(s.conn) | ||
writeTimeout.Reset(100 * time.Microsecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we Stop
the timer and possibly drain the channel before resetting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timer has to have fired for us to get to this point.
(sorry for the monster PR, these changes turned into a bit of a rabbit hole)
First up, this patch correctly implements deadlines:
Close()
in the circuit transport.a. We can't abort after a partial write anyways but we shouldn't block the stream write until we finish writing to the underlying connection. The simple solution is to claim that the write succeeds when we enqueue it. This is fine as network writes are buffered anyways.
b. The network is buffered so it doesn't matter.
c. We need this to coalesce/buffer writes to reduce packet count/secio overhead.
Next, this PR coalesces writes from any number of streams.
Finally, this PR breaks large writes into 16KiB chunks to:
Breaking:
net.Conn
(for deadlines). Shouldn't break our code as our stream muxer interfaces usenet.Conn
anyways.Tradeoffs:
This patch splits up writes into small (64KiB) buffers and copies them into temporary buffers (using a buffer pool). We have to do this as we now write async.
This impacts the built-in benchmarks quite a bit but, IMO, those are rather synthetic and don't reflect actual network conditions.
Coalesce Delays:
I've played with introducing an additional delay to try to coalesce writes a bit more. To test this, I tried connecting two IPFS nodes and counted the number of packets sent by both nodes. My results are:
Given this, I feel like we can mess with delays later if necessary but this is a good start (and the patch is already way too large).