Skip to content

Commit f9e47ad

Browse files
committed
[FAB-10469] Gossip: Don't lock when sending
The gossip connection locks the connection to preserve FIFO order when waiting for room in the output buffer to be available. A gossip connection has a corresponding goroutine for each reading or writing from the input/output buffer for the connection. When a connection is ordered to send a message, it tries to enqueue it into the output buffer, and if there is no room - it discards the message to not block the application layer. However, private data message shouldn't be discarded and the goroutine sending them should wait for room to be available in the buffer. The problem is, that this is done under a lock and thus the following might occur: peer p sends to peer q a private data message and q sends one to p. If p's and q's output buffers are full, the goroutine that send()s holds the lock, and waits for the output buffer to have room. However, to drain the receive buffer on the other side, both peers also need to obtain a lock on the connection. This results in a distributed deadlock. The most sensible fix in my opinion is to just no lock the connection when waiting on the output buffer. Change-Id: I63a64e9cf08364d2023d99f2bedb1e382765e6a8 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent ca6dbc6 commit f9e47ad

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

gossip/comm/comm_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,50 @@ func TestViperConfig(t *testing.T) {
202202
assert.Equal(t, 200, util.GetIntOrDefault("peer.gossip.sendBuffSize", 0))
203203
}
204204

205+
func TestMutualParallelSendWithAck(t *testing.T) {
206+
t.Parallel()
207+
208+
// This test tests concurrent and parallel sending of many (1000) messages
209+
// from 2 instances to one another at the same time.
210+
211+
msgNum := 1000
212+
213+
comm1, _ := newCommInstance(15201, naiveSec)
214+
comm2, _ := newCommInstance(15202, naiveSec)
215+
defer comm1.Stop()
216+
defer comm2.Stop()
217+
218+
acceptData := func(o interface{}) bool {
219+
return o.(proto.ReceivedMessage).GetGossipMessage().IsDataMsg()
220+
}
221+
222+
inc1 := comm1.Accept(acceptData)
223+
inc2 := comm2.Accept(acceptData)
224+
225+
// Send a message from comm1 to comm2, to make the instances establish a preliminary connection
226+
comm1.Send(createGossipMsg(), remotePeer(15202))
227+
// Wait for the message to be received in comm2
228+
<-inc2
229+
230+
for i := 0; i < msgNum; i++ {
231+
go comm1.SendWithAck(createGossipMsg(), time.Second*5, 1, remotePeer(15202))
232+
}
233+
234+
for i := 0; i < msgNum; i++ {
235+
go comm2.SendWithAck(createGossipMsg(), time.Second*5, 1, remotePeer(15201))
236+
}
237+
238+
go func() {
239+
for i := 0; i < msgNum; i++ {
240+
<-inc1
241+
}
242+
}()
243+
244+
for i := 0; i < msgNum; i++ {
245+
<-inc2
246+
}
247+
}
248+
205249
func TestHandshake(t *testing.T) {
206250
t.Parallel()
207251
signer := func(msg []byte) ([]byte, error) {

gossip/comm/conn.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -256,22 +256,21 @@ func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error),
256256
conn.logger.Debug("Aborting send() to ", conn.info.Endpoint, "because connection is closing")
257257
return
258258
}
259-
conn.Lock()
260-
defer conn.Unlock()
261259

262260
m := &msgSending{
263261
envelope: msg.Envelope,
264262
onErr: onErr,
265263
}
266264

267-
if len(conn.outBuff) == util.GetIntOrDefault("peer.gossip.sendBuffSize", defSendBuffSize) {
265+
if len(conn.outBuff) == cap(conn.outBuff) {
268266
if conn.logger.IsEnabledFor(logging.DEBUG) {
269267
conn.logger.Debug("Buffer to", conn.info.Endpoint, "overflowed, dropping message", msg.String())
270268
}
271269
if !shouldBlock {
272270
return
273271
}
274272
}
273+
275274
conn.outBuff <- m
276275
}
277276

0 commit comments

Comments
 (0)