Skip to content

Commit 7de3912

Browse files
committedSep 23, 2017
[FAB-5849] calibrate state transfer pace
Blocks received from peers via state transfer are added to the payload buffer right away regardless the payload buffer's size. In cases when state transfer is much faster than the commit process, blocks pile up in the payload buffer and the peer might be out of memory. This change set makes the method that handles payload reception from remote peers to add the payloads through the same code path that receives blocks from the orderer, which blocks in case the payload buffer is too overpopulated. Change-Id: I2fc1a916b809311a7d3aa0308b64d2127ad1ee60 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 51d4df6 commit 7de3912

File tree

3 files changed

+86
-18
lines changed

3 files changed

+86
-18
lines changed
 

‎gossip/state/mocks/gossip.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (g *GossipMock) Gossip(msg *proto.GossipMessage) {
6161
func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
6262
args := g.Called(acceptor, passThrough)
6363
if args.Get(0) == nil {
64-
return nil, args.Get(1).(<-chan proto.ReceivedMessage)
64+
return nil, args.Get(1).(chan proto.ReceivedMessage)
6565
}
6666
return args.Get(0).(<-chan *proto.GossipMessage), nil
6767
}

‎gossip/state/state.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,10 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
477477
if max < payload.SeqNum {
478478
max = payload.SeqNum
479479
}
480-
err := s.payloads.Push(payload)
480+
481+
err := s.addPayload(payload, blocking)
481482
if err != nil {
482-
logger.Warningf("Payload with sequence number %d was received earlier", payload.SeqNum)
483+
logger.Warningf("Payload with sequence number %d wasn't added to payload buffer: %v", payload.SeqNum, err)
483484
}
484485
}
485486
return max, nil

‎gossip/state/state_test.go

+82-15
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ func TestNilDirectMsg(t *testing.T) {
288288
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
289289
g := &mocks.GossipMock{}
290290
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
291-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
291+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
292292
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
293293
defer p.shutdown()
294294
p.s.handleStateRequest(nil)
@@ -305,7 +305,7 @@ func TestNilAddPayload(t *testing.T) {
305305
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
306306
g := &mocks.GossipMock{}
307307
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
308-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
308+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
309309
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
310310
defer p.shutdown()
311311
err := p.s.AddPayload(nil)
@@ -318,7 +318,7 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
318318
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
319319
g := &mocks.GossipMock{}
320320
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
321-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
321+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
322322
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
323323
defer p.shutdown()
324324
// Simulate a problem in the ledger
@@ -339,6 +339,77 @@ func TestAddPayloadLedgerUnavailable(t *testing.T) {
339339
assert.Contains(t, err.Error(), "cannot query ledger")
340340
}
341341

342+
func TestLargeBlockGap(t *testing.T) {
343+
// Scenario: the peer knows of a peer who has a ledger height much higher
344+
// than itself (500 blocks higher).
345+
// The peer needs to ask blocks in a way such that the size of the payload buffer
346+
// never rises above a certain threshold.
347+
348+
mc := &mockCommitter{}
349+
blocksPassedToLedger := make(chan uint64, 200)
350+
mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) {
351+
blocksPassedToLedger <- arg.Get(0).(*pcomm.Block).Header.Number
352+
})
353+
msgsFromPeer := make(chan proto.ReceivedMessage)
354+
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
355+
g := &mocks.GossipMock{}
356+
membership := []discovery.NetworkMember{
357+
{
358+
PKIid: common.PKIidType("a"),
359+
Endpoint: "a",
360+
Properties: &proto.Properties{
361+
LedgerHeight: 500,
362+
},
363+
}}
364+
g.On("PeersOfChannel", mock.Anything).Return(membership)
365+
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
366+
g.On("Accept", mock.Anything, true).Return(nil, msgsFromPeer)
367+
g.On("Send", mock.Anything, mock.Anything).Run(func(arguments mock.Arguments) {
368+
msg := arguments.Get(0).(*proto.GossipMessage)
369+
// The peer requested a state request
370+
req := msg.GetStateRequest()
371+
// Construct a skeleton for the response
372+
res := &proto.GossipMessage{
373+
Nonce: msg.Nonce,
374+
Channel: []byte(util.GetTestChainID()),
375+
Content: &proto.GossipMessage_StateResponse{
376+
StateResponse: &proto.RemoteStateResponse{},
377+
},
378+
}
379+
// Populate the response with payloads according to what the peer asked
380+
for seq := req.StartSeqNum; seq <= req.EndSeqNum; seq++ {
381+
rawblock := pcomm.NewBlock(seq, []byte{})
382+
b, _ := pb.Marshal(rawblock)
383+
payload := &proto.Payload{
384+
SeqNum: seq,
385+
Data: b,
386+
}
387+
res.GetStateResponse().Payloads = append(res.GetStateResponse().Payloads, payload)
388+
}
389+
// Finally, send the response down the channel the peer expects to receive it from
390+
sMsg, _ := res.NoopSign()
391+
msgsFromPeer <- &comm.ReceivedMessageImpl{
392+
SignedGossipMessage: sMsg,
393+
}
394+
})
395+
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
396+
defer p.shutdown()
397+
398+
// Process blocks at a speed of 20 Millisecond for each block.
399+
// The imaginative peer that responds to state
400+
// If the payload buffer expands above defMaxBlockDistance*2 + defAntiEntropyBatchSize blocks, fail the test
401+
blockProcessingTime := 20 * time.Millisecond // 10 seconds for total 500 blocks
402+
expectedSequence := 1
403+
for expectedSequence < 500 {
404+
blockSeq := <-blocksPassedToLedger
405+
assert.Equal(t, expectedSequence, int(blockSeq))
406+
// Ensure payload buffer isn't over-populated
407+
assert.True(t, p.s.payloads.Size() <= defMaxBlockDistance*2+defAntiEntropyBatchSize, "payload buffer size is %d", p.s.payloads.Size())
408+
expectedSequence++
409+
time.Sleep(blockProcessingTime)
410+
}
411+
}
412+
342413
func TestOverPopulation(t *testing.T) {
343414
// Scenario: Add to the state provider blocks
344415
// with a gap in between, and ensure that the payload buffer
@@ -353,7 +424,7 @@ func TestOverPopulation(t *testing.T) {
353424
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
354425
g := &mocks.GossipMock{}
355426
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
356-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
427+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
357428
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
358429
defer p.shutdown()
359430

@@ -415,7 +486,7 @@ func TestBlockingEnqueue(t *testing.T) {
415486
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
416487
g := &mocks.GossipMock{}
417488
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
418-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
489+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
419490
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
420491
defer p.shutdown()
421492

@@ -476,7 +547,7 @@ func TestFailures(t *testing.T) {
476547
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)
477548
g := &mocks.GossipMock{}
478549
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
479-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
550+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
480551
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
481552
assert.Panics(t, func() {
482553
newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
@@ -535,7 +606,7 @@ func TestGossipReception(t *testing.T) {
535606
g.On("Accept", mock.Anything, false).Return(rmc, nil).Run(func(_ mock.Arguments) {
536607
signalChan <- struct{}{}
537608
})
538-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
609+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
539610
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
540611
mc := &mockCommitter{}
541612
receivedChan := make(chan struct{})
@@ -576,7 +647,7 @@ func TestMetadataCompatibility(t *testing.T) {
576647
finChan <- struct{}{}
577648
})
578649
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
579-
g.On("Accept", mock.Anything, true).Return(nil, make(<-chan proto.ReceivedMessage))
650+
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
580651
metaState := common.NewNodeMetastate(5)
581652
b, _ := metaState.Bytes()
582653
defaultPeer := discovery.NetworkMember{
@@ -1149,12 +1220,8 @@ func TestTransferOfPrivateRWSet(t *testing.T) {
11491220
return ch
11501221
}
11511222

1152-
commChannelFactory := func(ch chan proto.ReceivedMessage) <-chan proto.ReceivedMessage {
1153-
return ch
1154-
}
1155-
11561223
g.On("Accept", mock.Anything, false).Return(gossipChannelFactory(gossipChannel), nil)
1157-
g.On("Accept", mock.Anything, true).Return(nil, commChannelFactory(commChannel))
1224+
g.On("Accept", mock.Anything, true).Return(nil, commChannel)
11581225

11591226
g.On("UpdateChannelMetadata", mock.Anything, mock.Anything)
11601227
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
@@ -1331,7 +1398,7 @@ func (t testPeer) Gossip() <-chan *proto.GossipMessage {
13311398
return t.gossipChannel
13321399
}
13331400

1334-
func (t testPeer) Comm() <-chan proto.ReceivedMessage {
1401+
func (t testPeer) Comm() chan proto.ReceivedMessage {
13351402
return t.commChannel
13361403
}
13371404

@@ -1372,7 +1439,7 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) {
13721439
Return(nil, peer.Comm()).
13731440
Once().
13741441
On("Accept", mock.Anything, true).
1375-
Return(nil, make(<-chan proto.ReceivedMessage))
1442+
Return(nil, make(chan proto.ReceivedMessage))
13761443

13771444
peer.On("UpdateChannelMetadata", mock.Anything, mock.Anything)
13781445
peer.coord.On("Close")

0 commit comments

Comments
 (0)