Skip to content

Commit b776e88

Browse files
committed
[FAB-8790]: Parallelize gossip state trans. UT
Making gossip state transfer test to be executed in parallel. Change-Id: I301db6aa7dafcb597a492f29b653d0892a225d97 Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent 2c91c37 commit b776e88

File tree

1 file changed

+59
-33
lines changed

1 file changed

+59
-33
lines changed

gossip/state/state_test.go

+59-33
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,16 @@ import (
4444
)
4545

4646
var (
47-
portPrefix = 28000
48-
)
47+
portStartRange = 28000
4948

50-
var orgID = []byte("ORG1")
49+
orgID = []byte("ORG1")
5150

52-
type peerIdentityAcceptor func(identity api.PeerIdentityType) error
51+
noopPeerIdentityAcceptor = func(identity api.PeerIdentityType) error {
52+
return nil
53+
}
54+
)
5355

54-
var noopPeerIdentityAcceptor = func(identity api.PeerIdentityType) error {
55-
return nil
56-
}
56+
type peerIdentityAcceptor func(identity api.PeerIdentityType) error
5757

5858
type joinChanMsg struct {
5959
}
@@ -144,7 +144,7 @@ func (*cryptoServiceMock) ValidateIdentity(peerIdentity api.PeerIdentityType) er
144144
return nil
145145
}
146146

147-
func bootPeers(ids ...int) []string {
147+
func bootPeers(portPrefix int, ids ...int) []string {
148148
peers := []string{}
149149
for _, id := range ids {
150150
peers = append(peers, fmt.Sprintf("localhost:%d", id+portPrefix))
@@ -290,11 +290,11 @@ func (mock *ramLedger) Close() {
290290
}
291291

292292
// Default configuration to be used for gossip and communication modules
293-
func newGossipConfig(id int, boot ...int) *gossip.Config {
293+
func newGossipConfig(portPrefix, id int, boot ...int) *gossip.Config {
294294
port := id + portPrefix
295295
return &gossip.Config{
296296
BindPort: port,
297-
BootstrapPeers: bootPeers(boot...),
297+
BootstrapPeers: bootPeers(portPrefix, boot...),
298298
ID: fmt.Sprintf("p%d", id),
299299
MaxBlockCountToStore: 0,
300300
MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond,
@@ -373,12 +373,14 @@ func newPeerNode(config *gossip.Config, committer committer.Committer, acceptor
373373
}
374374

375375
func TestNilDirectMsg(t *testing.T) {
376+
t.Parallel()
376377
mc := &mockCommitter{}
377378
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
378379
g := &mocks.GossipMock{}
379380
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
380381
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
381-
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
382+
portPrefix := portStartRange + 50
383+
p := newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
382384
defer p.shutdown()
383385
p.s.handleStateRequest(nil)
384386
p.s.directMessage(nil)
@@ -390,25 +392,29 @@ func TestNilDirectMsg(t *testing.T) {
390392
}
391393

392394
func TestNilAddPayload(t *testing.T) {
395+
t.Parallel()
393396
mc := &mockCommitter{}
394397
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
395398
g := &mocks.GossipMock{}
396399
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
397400
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
398-
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
401+
portPrefix := portStartRange + 100
402+
p := newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
399403
defer p.shutdown()
400404
err := p.s.AddPayload(nil)
401405
assert.Error(t, err)
402406
assert.Contains(t, err.Error(), "nil")
403407
}
404408

405409
func TestAddPayloadLedgerUnavailable(t *testing.T) {
410+
t.Parallel()
406411
mc := &mockCommitter{}
407412
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
408413
g := &mocks.GossipMock{}
409414
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
410415
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
411-
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
416+
portPrefix := portStartRange + 150
417+
p := newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
412418
defer p.shutdown()
413419
// Simulate a problem in the ledger
414420
failedLedger := mock.Mock{}
@@ -433,7 +439,7 @@ func TestLargeBlockGap(t *testing.T) {
433439
// than itself (500 blocks higher).
434440
// The peer needs to ask blocks in a way such that the size of the payload buffer
435441
// never rises above a certain threshold.
436-
442+
t.Parallel()
437443
mc := &mockCommitter{}
438444
blocksPassedToLedger := make(chan uint64, 200)
439445
mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) {
@@ -481,7 +487,8 @@ func TestLargeBlockGap(t *testing.T) {
481487
SignedGossipMessage: sMsg,
482488
}
483489
})
484-
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
490+
portPrefix := portStartRange + 200
491+
p := newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
485492
defer p.shutdown()
486493

487494
// Process blocks at a speed of 20 Millisecond for each block.
@@ -504,7 +511,7 @@ func TestOverPopulation(t *testing.T) {
504511
// with a gap in between, and ensure that the payload buffer
505512
// rejects blocks starting if the distance between the ledger height to the latest
506513
// block it contains is bigger than defMaxBlockDistance.
507-
514+
t.Parallel()
508515
mc := &mockCommitter{}
509516
blocksPassedToLedger := make(chan uint64, 10)
510517
mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) {
@@ -514,7 +521,8 @@ func TestOverPopulation(t *testing.T) {
514521
g := &mocks.GossipMock{}
515522
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
516523
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
517-
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
524+
portPrefix := portStartRange + 250
525+
p := newPeerNode(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor)
518526
defer p.shutdown()
519527

520528
// Add some blocks in a sequential manner and make sure it works
@@ -567,6 +575,7 @@ func TestBlockingEnqueue(t *testing.T) {
567575
// Scenario: In parallel, get blocks from gossip and from the orderer.
568576
// The blocks from the orderer we get are X2 times the amount of blocks from gossip.
569577
// The blocks we get from gossip are random indices, to maximize disruption.
578+
t.Parallel()
570579
mc := &mockCommitter{}
571580
blocksPassedToLedger := make(chan uint64, 10)
572581
mc.On("CommitWithPvtData", mock.Anything).Run(func(arg mock.Arguments) {
@@ -576,7 +585,8 @@ func TestBlockingEnqueue(t *testing.T) {
576585
g := &mocks.GossipMock{}
577586
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
578587
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
579-
p := newPeerNode(newGossipConfig(0), mc, noopPeerIdentityAcceptor)
588+
portPrefix := portStartRange + 300
589+
p := newPeerNode(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor)
580590
defer p.shutdown()
581591

582592
numBlocksReceived := 500
@@ -631,6 +641,7 @@ func TestBlockingEnqueue(t *testing.T) {
631641
}
632642

633643
func TestHaltChainProcessing(t *testing.T) {
644+
t.Parallel()
634645
gossipChannel := func(c chan *proto.GossipMessage) <-chan *proto.GossipMessage {
635646
return c
636647
}
@@ -687,29 +698,33 @@ func TestHaltChainProcessing(t *testing.T) {
687698
v.On("Validate").Return(&errors2.VSCCExecutionFailureError{
688699
Reason: "foobar",
689700
}).Once()
690-
newPeerNodeWithGossipWithValidator(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g, v)
701+
portPrefix := portStartRange + 350
702+
newPeerNodeWithGossipWithValidator(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g, v)
691703
gossipMsgs <- newBlockMsg(1)
692704
logAsserter.assertLastLogContains(t, "Got error while committing")
693705
logAsserter.assertLastLogContains(t, "foobar", "Aborting chain processing")
694706
}
695707

696708
func TestFailures(t *testing.T) {
709+
t.Parallel()
710+
portPrefix := portStartRange + 400
697711
mc := &mockCommitter{}
698712
mc.On("LedgerHeight", mock.Anything).Return(uint64(0), nil)
699713
g := &mocks.GossipMock{}
700714
g.On("Accept", mock.Anything, false).Return(make(<-chan *proto.GossipMessage), nil)
701715
g.On("Accept", mock.Anything, true).Return(nil, make(chan proto.ReceivedMessage))
702716
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
703717
assert.Panics(t, func() {
704-
newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
718+
newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
705719
})
706720
// Reprogram mock
707721
mc.Mock = mock.Mock{}
708722
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), errors.New("Failed accessing ledger"))
709-
assert.Nil(t, newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g))
723+
assert.Nil(t, newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g))
710724
}
711725

712726
func TestGossipReception(t *testing.T) {
727+
t.Parallel()
713728
signalChan := make(chan struct{})
714729
rawblock := &pcomm.Block{
715730
Header: &pcomm.BlockHeader{
@@ -766,7 +781,8 @@ func TestGossipReception(t *testing.T) {
766781
receivedChan <- struct{}{}
767782
})
768783
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
769-
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
784+
portPrefix := portStartRange + 450
785+
p := newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
770786
defer p.shutdown()
771787
select {
772788
case <-receivedChan:
@@ -781,6 +797,7 @@ func TestLedgerHeightFromProperties(t *testing.T) {
781797
// either set both metadata properly, or only the properties, or none, or both.
782798
// Ensure the logic handles all of the 4 possible cases as needed
783799

800+
t.Parallel()
784801
// Returns whether the given networkMember was selected or not
785802
wasNetworkMemberSelected := func(t *testing.T, networkMember discovery.NetworkMember, wg *sync.WaitGroup) bool {
786803
var wasGivenNetworkMemberSelected int32
@@ -811,7 +828,8 @@ func TestLedgerHeightFromProperties(t *testing.T) {
811828
})
812829
mc := &mockCommitter{}
813830
mc.On("LedgerHeight", mock.Anything).Return(uint64(1), nil)
814-
p := newPeerNodeWithGossip(newGossipConfig(0), mc, noopPeerIdentityAcceptor, g)
831+
portPrefix := portStartRange + 500
832+
p := newPeerNodeWithGossip(newGossipConfig(portPrefix, 0), mc, noopPeerIdentityAcceptor, g)
815833
defer p.shutdown()
816834
select {
817835
case <-time.After(time.Second * 20):
@@ -853,6 +871,7 @@ func TestLedgerHeightFromProperties(t *testing.T) {
853871
}
854872

855873
func TestAccessControl(t *testing.T) {
874+
t.Parallel()
856875
bootstrapSetSize := 5
857876
bootstrapSet := make([]*peerNode, 0)
858877

@@ -862,17 +881,17 @@ func TestAccessControl(t *testing.T) {
862881
"localhost:5618": {},
863882
"localhost:5621": {},
864883
}
884+
portPrefix := portStartRange + 600
865885

866886
blockPullPolicy := func(identity api.PeerIdentityType) error {
867887
if _, isAuthorized := authorizedPeers[string(identity)]; isAuthorized {
868888
return nil
869889
}
870890
return errors.New("Not authorized")
871891
}
872-
873892
for i := 0; i < bootstrapSetSize; i++ {
874893
commit := newCommitter()
875-
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), commit, blockPullPolicy))
894+
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(portPrefix, i), commit, blockPullPolicy))
876895
}
877896

878897
defer func() {
@@ -901,7 +920,7 @@ func TestAccessControl(t *testing.T) {
901920

902921
for i := 0; i < standardPeerSetSize; i++ {
903922
commit := newCommitter()
904-
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), commit, blockPullPolicy))
923+
peersSet = append(peersSet, newPeerNode(newGossipConfig(portPrefix, bootstrapSetSize+i, 0, 1, 2, 3, 4), commit, blockPullPolicy))
905924
}
906925

907926
defer func() {
@@ -943,12 +962,14 @@ func TestAccessControl(t *testing.T) {
943962
}
944963

945964
func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
965+
t.Parallel()
946966
bootstrapSetSize := 5
947967
bootstrapSet := make([]*peerNode, 0)
968+
portPrefix := portStartRange + 650
948969

949970
for i := 0; i < bootstrapSetSize; i++ {
950971
commit := newCommitter()
951-
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), commit, noopPeerIdentityAcceptor))
972+
bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(portPrefix, i), commit, noopPeerIdentityAcceptor))
952973
}
953974

954975
defer func() {
@@ -977,7 +998,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
977998

978999
for i := 0; i < standartPeersSize; i++ {
9791000
commit := newCommitter()
980-
peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), commit, noopPeerIdentityAcceptor))
1001+
peersSet = append(peersSet, newPeerNode(newGossipConfig(portPrefix, bootstrapSetSize+i, 0, 1, 2, 3, 4), commit, noopPeerIdentityAcceptor))
9811002
}
9821003

9831004
defer func() {
@@ -1012,10 +1033,12 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) {
10121033
}
10131034

10141035
func TestGossipStateProvider_TestStateMessages(t *testing.T) {
1015-
bootPeer := newPeerNode(newGossipConfig(0), newCommitter(), noopPeerIdentityAcceptor)
1036+
t.Parallel()
1037+
portPrefix := portStartRange + 700
1038+
bootPeer := newPeerNode(newGossipConfig(portPrefix, 0), newCommitter(), noopPeerIdentityAcceptor)
10161039
defer bootPeer.shutdown()
10171040

1018-
peer := newPeerNode(newGossipConfig(1, 0), newCommitter(), noopPeerIdentityAcceptor)
1041+
peer := newPeerNode(newGossipConfig(portPrefix, 1, 0), newCommitter(), noopPeerIdentityAcceptor)
10191042
defer peer.shutdown()
10201043

10211044
naiveStateMsgPredicate := func(message interface{}) bool {
@@ -1080,7 +1103,9 @@ func TestGossipStateProvider_TestStateMessages(t *testing.T) {
10801103
// complete missing blocks. Since state transfer messages now batched, it is expected
10811104
// to see _exactly_ two messages with state transfer response.
10821105
func TestNewGossipStateProvider_BatchingOfStateRequest(t *testing.T) {
1083-
bootPeer := newPeerNode(newGossipConfig(0), newCommitter(), noopPeerIdentityAcceptor)
1106+
t.Parallel()
1107+
portPrefix := portStartRange + 750
1108+
bootPeer := newPeerNode(newGossipConfig(portPrefix, 0), newCommitter(), noopPeerIdentityAcceptor)
10841109
defer bootPeer.shutdown()
10851110

10861111
msgCount := defAntiEntropyBatchSize + 5
@@ -1099,7 +1124,7 @@ func TestNewGossipStateProvider_BatchingOfStateRequest(t *testing.T) {
10991124
}
11001125
}
11011126

1102-
peer := newPeerNode(newGossipConfig(1, 0), newCommitter(), noopPeerIdentityAcceptor)
1127+
peer := newPeerNode(newGossipConfig(portPrefix, 1, 0), newCommitter(), noopPeerIdentityAcceptor)
11031128
defer peer.shutdown()
11041129

11051130
naiveStateMsgPredicate := func(message interface{}) bool {
@@ -1238,6 +1263,7 @@ type testData struct {
12381263
}
12391264

12401265
func TestTransferOfPrivateRWSet(t *testing.T) {
1266+
t.Parallel()
12411267
chainID := "testChainID"
12421268

12431269
// First gossip instance
@@ -1462,7 +1488,7 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) {
14621488
Test going to check that block from one peer will be replicated into second one and
14631489
have identical content.
14641490
*/
1465-
1491+
t.Parallel()
14661492
chainID := "testChainID"
14671493

14681494
// Initialize peer

0 commit comments

Comments
 (0)