Skip to content

Commit e6bc6d7

Browse files
committedJul 23, 2017
[FAB-5433] Move costly checks from the critical path
In gossip, when a message that contains a block is sent to a peer (or a peer requests a block) - we consult the MessageCryptoService and ask it- is the given peer (that is considered to receive the block from us) among the channel readers. The problem is that this operation involves verifying ECDSA signatures and it is in the critical path of the dissemination. This commit moves the policy checks from the critical path to the time when the stateInfo messages that are used to verify the peers eligibility are being put into the in-memory cache: - Whenever the StateInfo message is attempted to be added into the in-memory cache, it is checked that the peer that signed it- is a channel reader and is in the channel - Whenver a config updates is received, all messages in the in-memory cache are validated, and if some are found to be invalid - they are deleted from the cache. With this commit: whenever the channel module is asked whether a given peer is eligible of receiving blocks, it simply does a lookup in the in-memory cache of StateInfo messages. More details can be found in the JIRA item. Change-Id: Id5c447e2ad221b3d5276bfc0e878e403ec73a5d3 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 3a4b1f2 commit e6bc6d7

File tree

2 files changed

+197
-10
lines changed

2 files changed

+197
-10
lines changed
 

‎gossip/gossip/channel/channel.go

+81-8
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,49 @@ func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.M
181181
pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId
182182
return gc.Lookup(pkiID) == nil
183183
}
184-
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership)
184+
verifyStateInfoMsg := func(msg *proto.SignedGossipMessage, orgs ...api.OrgIdentityType) bool {
185+
si := msg.GetStateInfo()
186+
// No point in verifying ourselves
187+
if bytes.Equal(gc.pkiID, si.PkiId) {
188+
return true
189+
}
190+
peerIdentity := adapter.GetIdentityByPKIID(si.PkiId)
191+
if len(peerIdentity) == 0 {
192+
gc.logger.Warning("Identity for peer", si.PkiId, "doesn't exist")
193+
return false
194+
}
195+
isOrgInChan := func(org api.OrgIdentityType) bool {
196+
if len(orgs) == 0 {
197+
if !gc.IsOrgInChannel(org) {
198+
return false
199+
}
200+
} else {
201+
found := false
202+
for _, chanMember := range orgs {
203+
if bytes.Equal(chanMember, org) {
204+
found = true
205+
break
206+
}
207+
}
208+
if !found {
209+
return false
210+
}
211+
}
212+
return true
213+
}
214+
215+
org := gc.GetOrgOfPeer(si.PkiId)
216+
if !isOrgInChan(org) {
217+
gc.logger.Warning("peer", peerIdentity, "'s organization(", string(org), ") isn't in the channel", string(chainID))
218+
return false
219+
}
220+
if err := gc.mcs.VerifyByChannel(chainID, peerIdentity, msg.Signature, msg.Payload); err != nil {
221+
gc.logger.Warning("Peer", peerIdentity, "isn't eligible for channel", string(chainID), ":", err)
222+
return false
223+
}
224+
return true
225+
}
226+
gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership, verifyStateInfoMsg)
185227

186228
ttl := election.GetMsgExpirationTimeout()
187229
pol := proto.NewGossipMessageComparator(0)
@@ -321,17 +363,16 @@ func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
321363
// EligibleForChannel returns whether the given member should get blocks
322364
// for this channel
323365
func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
324-
if !gc.IsMemberInChan(member) {
366+
peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
367+
if len(peerIdentity) == 0 {
368+
gc.logger.Warning("Identity for peer", member.PKIid, "doesn't exist")
325369
return false
326370
}
327-
328-
identity := gc.GetIdentityByPKIID(member.PKIid)
329371
msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
330-
if msg == nil || identity == nil {
372+
if msg == nil {
331373
return false
332374
}
333-
334-
return gc.mcs.VerifyByChannel(gc.chainID, identity, msg.Envelope.Signature, msg.Envelope.Payload) == nil
375+
return true
335376
}
336377

337378
// AddToMsgStore adds a given GossipMessage to the message store
@@ -368,6 +409,7 @@ func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
368409

369410
gc.orgs = joinMsg.Members()
370411
gc.joinMsg = joinMsg
412+
gc.stateInfoMsgStore.validate(joinMsg.Members())
371413
}
372414

373415
// HandleMessage processes a message sent by a remote peer
@@ -663,10 +705,12 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
663705
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
664706
}
665707

666-
func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool) *stateInfoCache {
708+
func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool, verifyFunc membershipPredicate) *stateInfoCache {
667709
membershipStore := util.NewMembershipStore()
668710
pol := proto.NewGossipMessageComparator(0)
711+
669712
s := &stateInfoCache{
713+
verify: verifyFunc,
670714
MembershipStore: membershipStore,
671715
stopChan: make(chan struct{}),
672716
}
@@ -689,19 +733,40 @@ func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{})
689733
return s
690734
}
691735

736+
// membershipPredicate receives a StateInfoMessage and optionally a slice of organization identifiers
737+
// and returns whether the peer that signed the given StateInfoMessage is eligible
738+
// to the channel or not
739+
type membershipPredicate func(msg *proto.SignedGossipMessage, orgs ...api.OrgIdentityType) bool
740+
692741
// stateInfoCache is actually a messageStore
693742
// that also indexes messages that are added
694743
// so that they could be extracted later
695744
type stateInfoCache struct {
745+
verify membershipPredicate
696746
*util.MembershipStore
697747
msgstore.MessageStore
698748
stopChan chan struct{}
699749
}
700750

751+
func (cache *stateInfoCache) validate(orgs []api.OrgIdentityType) {
752+
for _, m := range cache.Get() {
753+
msg := m.(*proto.SignedGossipMessage)
754+
if !cache.verify(msg, orgs...) {
755+
cache.delete(msg)
756+
}
757+
}
758+
}
759+
701760
// Add attempts to add the given message to the stateInfoCache,
702761
// and if the message was added, also indexes it.
703762
// Message must be a StateInfo message.
704763
func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
764+
if !cache.MessageStore.CheckValid(msg) {
765+
return false
766+
}
767+
if !cache.verify(msg) {
768+
return false
769+
}
705770
added := cache.MessageStore.Add(msg)
706771
if added {
707772
pkiID := msg.GetStateInfo().PkiId
@@ -710,6 +775,14 @@ func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
710775
return added
711776
}
712777

778+
func (cache *stateInfoCache) delete(msg *proto.SignedGossipMessage) {
779+
cache.Purge(func(o interface{}) bool {
780+
pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId
781+
return bytes.Equal(pkiID, msg.GetStateInfo().PkiId)
782+
})
783+
cache.Remove(msg.GetStateInfo().PkiId)
784+
}
785+
713786
func (cache *stateInfoCache) Stop() {
714787
cache.stopChan <- struct{}{}
715788
}

‎gossip/gossip/channel/channel_test.go

+116-2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ func (cs *cryptoService) VerifyByChannel(channel common.ChainID, identity api.Pe
112112
return nil
113113
}
114114
args := cs.Called(identity)
115+
if args.Get(0) == nil {
116+
return nil
117+
}
115118
return args.Get(0).(error)
116119
}
117120

@@ -222,6 +225,9 @@ func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdenti
222225
}
223226

224227
func (ga *gossipAdapterMock) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
228+
if ga.wasMocked("GetIdentityByPKIID") {
229+
return ga.Called(pkiID).Get(0).(api.PeerIdentityType)
230+
}
225231
return api.PeerIdentityType(pkiID)
226232
}
227233

@@ -676,10 +682,12 @@ func TestChannelPeerNotInChannel(t *testing.T) {
676682

677683
// Now for a more advanced scenario- the peer claims to be in the right org, and also claims to be in the channel
678684
// but the MSP declares it is not eligible for the channel
679-
// pkiIDInOrg1ButNotEligible
680685
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
686+
// configure MSP
681687
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
682688
cs.mocked = true
689+
// Simulate a config update
690+
gc.ConfigureChannel(&joinChanMsg{})
683691
helloMsg = createHelloMsg(pkiIDInOrg1ButNotEligible)
684692
helloMsg.On("Respond", mock.Anything).Run(messageRelayer)
685693
gc.HandleMessage(helloMsg)
@@ -1400,6 +1408,110 @@ func TestChannelNoAnchorPeers(t *testing.T) {
14001408
assert.True(t, gc.IsOrgInChannel(orgInChannelA))
14011409
}
14021410

1411+
func TestGossipChannelEligibility(t *testing.T) {
1412+
t.Parallel()
1413+
1414+
// Scenario: We have a peer in an org that joins a channel with org1 and org2.
1415+
// and it receives StateInfo messages of other peers and the eligibility
1416+
// of these peers of being in the channel is checked.
1417+
// During the test, the channel is reconfigured, and the expiration
1418+
// of the peer identities is simulated.
1419+
1420+
cs := &cryptoService{}
1421+
selfPKIID := common.PKIidType("p")
1422+
adapter := new(gossipAdapterMock)
1423+
pkiIDinOrg3 := common.PKIidType("pkiIDinOrg3")
1424+
members := []discovery.NetworkMember{
1425+
{PKIid: pkiIDInOrg1},
1426+
{PKIid: pkiIDInOrg1ButNotEligible},
1427+
{PKIid: pkiIDinOrg2},
1428+
}
1429+
adapter.On("GetMembership").Return(members)
1430+
adapter.On("Gossip", mock.Anything)
1431+
adapter.On("Send", mock.Anything, mock.Anything)
1432+
adapter.On("DeMultiplex", mock.Anything)
1433+
adapter.On("GetConf").Return(conf)
1434+
1435+
// At first, all peers are in the channel except pkiIDinOrg3
1436+
org1 := api.OrgIdentityType("ORG1")
1437+
org2 := api.OrgIdentityType("ORG2")
1438+
org3 := api.OrgIdentityType("ORG3")
1439+
1440+
adapter.On("GetOrgOfPeer", selfPKIID).Return(org1)
1441+
adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(org1)
1442+
adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(org2)
1443+
adapter.On("GetOrgOfPeer", pkiIDInOrg1ButNotEligible).Return(org1)
1444+
adapter.On("GetOrgOfPeer", pkiIDinOrg3).Return(org3)
1445+
1446+
gc := NewGossipChannel(selfPKIID, orgInChannelA, cs, channelA, adapter, &joinChanMsg{
1447+
members2AnchorPeers: map[string][]api.AnchorPeer{
1448+
string(org1): {},
1449+
string(org2): {},
1450+
},
1451+
})
1452+
// Every peer sends a StateInfo message
1453+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
1454+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
1455+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1ButNotEligible, channelA)})
1456+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg3, channelA)})
1457+
1458+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1459+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1460+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
1461+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
1462+
1463+
// Remove org2 from the channel
1464+
gc.ConfigureChannel(&joinChanMsg{
1465+
members2AnchorPeers: map[string][]api.AnchorPeer{
1466+
string(org1): {},
1467+
},
1468+
})
1469+
1470+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1471+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1472+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
1473+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
1474+
1475+
// Now simulate a config update that removed pkiIDInOrg1ButNotEligible from the channel readers
1476+
cs.mocked = true
1477+
cs.On("VerifyByChannel", api.PeerIdentityType(pkiIDInOrg1ButNotEligible)).Return(errors.New("Not a channel reader"))
1478+
cs.On("VerifyByChannel", mock.Anything).Return(nil)
1479+
gc.ConfigureChannel(&joinChanMsg{
1480+
members2AnchorPeers: map[string][]api.AnchorPeer{
1481+
string(org1): {},
1482+
},
1483+
})
1484+
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1485+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1486+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
1487+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
1488+
1489+
// Now Simulate a certificate expiration of pkiIDInOrg1.
1490+
// This is done by asking the adapter to lookup the identity by PKI-ID, but if the certificate
1491+
// is expired, the mapping is deleted and hence the lookup yields nothing.
1492+
adapter.On("GetIdentityByPKIID", pkiIDInOrg1).Return(api.PeerIdentityType(nil))
1493+
adapter.On("GetIdentityByPKIID", pkiIDinOrg2).Return(api.PeerIdentityType(pkiIDinOrg2))
1494+
adapter.On("GetIdentityByPKIID", pkiIDInOrg1ButNotEligible).Return(api.PeerIdentityType(pkiIDInOrg1ButNotEligible))
1495+
adapter.On("GetIdentityByPKIID", pkiIDinOrg3).Return(api.PeerIdentityType(pkiIDinOrg3))
1496+
1497+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1498+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1499+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
1500+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
1501+
1502+
// Now make another update of StateInfo messages, this time with updated ledger height (to overwrite earlier messages)
1503+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(2, pkiIDInOrg1, channelA)})
1504+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(2, pkiIDinOrg2, channelA)})
1505+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(2, pkiIDInOrg1ButNotEligible, channelA)})
1506+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(2, pkiIDinOrg3, channelA)})
1507+
1508+
// Ensure the access control resolution hasn't changed
1509+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1510+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1511+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
1512+
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
1513+
}
1514+
14031515
func TestChannelGetPeers(t *testing.T) {
14041516
t.Parallel()
14051517

@@ -1420,7 +1532,7 @@ func TestChannelGetPeers(t *testing.T) {
14201532
{PKIid: pkiIDinOrg2},
14211533
}
14221534
configureAdapter(adapter, members...)
1423-
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
1535+
gc := NewGossipChannel(common.PKIidType("p0"), orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
14241536
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
14251537
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
14261538
assert.Len(t, gc.GetPeers(), 1)
@@ -1429,6 +1541,8 @@ func TestChannelGetPeers(t *testing.T) {
14291541
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
14301542
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
14311543
cs.mocked = true
1544+
// Simulate a config update
1545+
gc.ConfigureChannel(&joinChanMsg{})
14321546
assert.Len(t, gc.GetPeers(), 0)
14331547

14341548
// Now recreate gc and corrupt the MAC

0 commit comments

Comments
 (0)
Please sign in to comment.