From f772aae9f7223ad69e1c68459cfdf56f1ebf6023 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 11 Dec 2017 12:07:57 -0800 Subject: [PATCH 1/4] don't hold the lock when closing In libp2p, Close is assumed to be threadsafe and we'd like to interrupt in-progress reads/writes. As a matter of fact, we're lucky this hasn't caused close to hang. If we had tried to close the reader before closing the writer, we would have blocked on a concurrent read call. Part of ipfs/go-ipfs#2823 --- msgio.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/msgio.go b/msgio.go index 52b3240..9ffb426 100644 --- a/msgio.go +++ b/msgio.go @@ -105,9 +105,6 @@ func (s *writer) WriteMsg(msg []byte) (err error) { } func (s *writer) Close() error { - s.lock.Lock() - defer s.lock.Unlock() - if c, ok := s.W.(io.Closer); ok { return c.Close() } @@ -216,9 +213,6 @@ func (s *reader) ReleaseMsg(msg []byte) { } func (s *reader) Close() error { - s.lock.Lock() - defer s.lock.Unlock() - if c, ok := s.R.(io.Closer); ok { return c.Close() } From 47494676768cf40b65472e2aff52eb50153162e1 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 11 Dec 2017 12:31:23 -0800 Subject: [PATCH 2/4] add tests for close when reading/writing --- msgio_test.go | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/msgio_test.go b/msgio_test.go index 66fa3a0..fd7c8c0 100644 --- a/msgio_test.go +++ b/msgio_test.go @@ -3,12 +3,13 @@ package msgio import ( "bytes" "fmt" - randbuf "github.com/jbenet/go-randbuf" "io" "math/rand" "sync" "testing" "time" + + randbuf "github.com/jbenet/go-randbuf" ) func TestReadWrite(t *testing.T) { @@ -195,3 +196,43 @@ func TestBadSizes(t *testing.T) { } _ = msg } + +func TestReadClose(t *testing.T) { + r, w := io.Pipe() + writer := NewWriter(w) + defer writer.Close() + reader := NewReader(r) + + buf := [10]byte{} + done := make(chan struct{}) + go func() { + defer close(done) + time.Sleep(10 * time.Millisecond) + reader.Close() + }() + n, err := reader.Read(buf[:]) + if n != 0 || err == nil { + t.Error("expected to read nothing") + } + <-done +} + +func TestWriteClose(t *testing.T) { + r, w := io.Pipe() + writer := NewWriter(w) + reader := NewReader(r) + defer reader.Close() + + buf := [10]byte{} + done := make(chan struct{}) + go func() { + defer close(done) + time.Sleep(10 * time.Millisecond) + writer.Close() + }() + n, err := writer.Write(buf[:]) + if n != 0 || err == nil { + t.Error("expected to read nothing") + } + <-done +} From 373c219f77da9c2593ea246c2ec1e6f091379aa9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 11 Dec 2017 12:39:07 -0800 Subject: [PATCH 3/4] also avoid locking while closing the varint reader/writers --- varint.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/varint.go b/varint.go index d942af3..2fce3b6 100644 --- a/varint.go +++ b/varint.go @@ -49,9 +49,6 @@ func (s *varintWriter) WriteMsg(msg []byte) error { } func (s *varintWriter) Close() error { - s.lock.Lock() - defer s.lock.Unlock() - if c, ok := s.W.(io.Closer); ok { return c.Close() } @@ -162,9 +159,6 @@ func (s *varintReader) ReleaseMsg(msg []byte) { } func (s *varintReader) Close() error { - s.lock.Lock() - defer s.lock.Unlock() - if c, ok := s.R.(io.Closer); ok { return c.Close() } From 3b66fd3b49bf095c5c20bcfb35a7b6676ed42c8a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 11 Dec 2017 12:39:19 -0800 Subject: [PATCH 4/4] test closing while reading/writing on varint readers/writers --- msgio_test.go | 24 ++++++++++++++++-------- varint_test.go | 15 +++++++++++++++ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/msgio_test.go b/msgio_test.go index fd7c8c0..6476e43 100644 --- a/msgio_test.go +++ b/msgio_test.go @@ -33,6 +33,20 @@ func TestReadWriteMsgSync(t *testing.T) { SubtestReadWriteMsgSync(t, writer, reader) } +func TestReadClose(t *testing.T) { + r, w := io.Pipe() + writer := NewWriter(w) + reader := NewReader(r) + SubtestReadClose(t, writer, reader) +} + +func TestWriteClose(t *testing.T) { + r, w := io.Pipe() + writer := NewWriter(w) + reader := NewReader(r) + SubtestWriteClose(t, writer, reader) +} + func SubtestReadWrite(t *testing.T, writer WriteCloser, reader ReadCloser) { msgs := [1000][]byte{} @@ -197,11 +211,8 @@ func TestBadSizes(t *testing.T) { _ = msg } -func TestReadClose(t *testing.T) { - r, w := io.Pipe() - writer := NewWriter(w) +func SubtestReadClose(t *testing.T, writer WriteCloser, reader ReadCloser) { defer writer.Close() - reader := NewReader(r) buf := [10]byte{} done := make(chan struct{}) @@ -217,10 +228,7 @@ func TestReadClose(t *testing.T) { <-done } -func TestWriteClose(t *testing.T) { - r, w := io.Pipe() - writer := NewWriter(w) - reader := NewReader(r) +func SubtestWriteClose(t *testing.T, writer WriteCloser, reader ReadCloser) { defer reader.Close() buf := [10]byte{} diff --git a/varint_test.go b/varint_test.go index c634150..84cb7f0 100644 --- a/varint_test.go +++ b/varint_test.go @@ -3,6 +3,7 @@ package msgio import ( "bytes" "encoding/binary" + "io" "testing" ) @@ -64,3 +65,17 @@ func SubtestVarintWrite(t *testing.T, msg []byte) { t.Fatalf("wrote incorrect number of bytes: %d != %d", len(bb), bblen) } } + +func TestVarintReadClose(t *testing.T) { + r, w := io.Pipe() + writer := NewVarintWriter(w) + reader := NewVarintReader(r) + SubtestReadClose(t, writer, reader) +} + +func TestVarintWriteClose(t *testing.T) { + r, w := io.Pipe() + writer := NewVarintWriter(w) + reader := NewVarintReader(r) + SubtestWriteClose(t, writer, reader) +}