Skip to content

Commit f4007a4

Browse files
committed
[FAB-7552] IdentityInfo in gossip identityStore
The discovery service needs to output identities of known peers. Gossip stores the identities in an identity mapper, which could be extended to return a snapshot of its identities. This change set makes it so that the identity mapper also saves the organizations of peers upon their insertion, and adds the needed method. Change-Id: I94b032781bae5d07a3d9fef586273ac8bfcc2e4d Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent daaf524 commit f4007a4

File tree

9 files changed

+101
-16
lines changed

9 files changed

+101
-16
lines changed

gossip/comm/comm_test.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ var (
5656
type naiveSecProvider struct {
5757
}
5858

59+
func (*naiveSecProvider) OrgByPeerIdentity(api.PeerIdentityType) api.OrgIdentityType {
60+
return nil
61+
}
62+
5963
func (*naiveSecProvider) Expiration(peerIdentity api.PeerIdentityType) (time.Time, error) {
6064
return time.Now().Add(time.Hour), nil
6165
}
@@ -102,10 +106,10 @@ func (*naiveSecProvider) VerifyByChannel(_ common.ChainID, _ api.PeerIdentityTyp
102106
return nil
103107
}
104108

105-
func newCommInstance(port int, sec api.MessageCryptoService) (Comm, error) {
109+
func newCommInstance(port int, sec *naiveSecProvider) (Comm, error) {
106110
endpoint := fmt.Sprintf("localhost:%d", port)
107111
id := []byte(endpoint)
108-
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec, id, noopPurgeIdentity), id, nil)
112+
inst, err := NewCommInstanceWithServer(port, identity.NewIdentityMapper(sec, id, noopPurgeIdentity, sec), id, nil)
109113
return inst, err
110114
}
111115

@@ -222,7 +226,7 @@ func TestHandshake(t *testing.T) {
222226
assert.NoError(t, err)
223227
s := grpc.NewServer()
224228
id := []byte("localhost:9611")
225-
idMapper := identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity)
229+
idMapper := identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity, naiveSec)
226230
inst, err := NewCommInstance(s, nil, idMapper, api.PeerIdentityType("localhost:9611"), func() []grpc.DialOption {
227231
return []grpc.DialOption{grpc.WithInsecure()}
228232
})
@@ -354,14 +358,14 @@ func TestProdConstructor(t *testing.T) {
354358
defer srv.Stop()
355359
defer lsnr.Close()
356360
id := []byte("localhost:29000")
357-
comm1, _ := NewCommInstance(srv, certs, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
361+
comm1, _ := NewCommInstance(srv, certs, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity, naiveSec), id, dialOpts)
358362
go srv.Serve(lsnr)
359363

360364
srv, lsnr, dialOpts, certs = createGRPCLayer(39000)
361365
defer srv.Stop()
362366
defer lsnr.Close()
363367
id = []byte("localhost:39000")
364-
comm2, _ := NewCommInstance(srv, certs, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
368+
comm2, _ := NewCommInstance(srv, certs, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity, naiveSec), id, dialOpts)
365369
go srv.Serve(lsnr)
366370
defer comm1.Stop()
367371
defer comm2.Stop()

gossip/gossip/certstore_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons pro
437437
Mediator: pullMediator,
438438
}, identity.NewIdentityMapper(cs, selfIdentity, func(pkiID common.PKIidType, _ api.PeerIdentityType) {
439439
pullMediator.Remove(string(pkiID))
440-
}), selfIdentity, cs)
440+
}, cs), selfIdentity, cs)
441441

442442
wg := sync.WaitGroup{}
443443
wg.Add(1)

gossip/gossip/gossip.go

+3
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Gossip interface {
7272
// any connections to peers with identities that are found invalid
7373
SuspectPeers(s api.PeerSuspector)
7474

75+
// IdentityInfo returns information known peer identities
76+
IdentityInfo() api.PeerIdentitySet
77+
7578
// Stop stops the gossip component
7679
Stop()
7780
}

gossip/gossip/gossip_impl.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
9191
g.idMapper = identity.NewIdentityMapper(mcs, selfIdentity, func(pkiID common.PKIidType, identity api.PeerIdentityType) {
9292
g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
9393
g.certPuller.Remove(string(pkiID))
94-
})
94+
}, secAdvisor)
9595

9696
if s == nil {
9797
g.comm, err = createCommWithServer(conf.BindPort, g.idMapper, selfIdentity, secureDialOpts)
@@ -604,6 +604,11 @@ func (g *gossipServiceImpl) removeSelfLoop(msg *emittedGossipMessage, peers []*c
604604
return result
605605
}
606606

607+
// IdentityInfo returns information known peer identities
608+
func (g *gossipServiceImpl) IdentityInfo() api.PeerIdentitySet {
609+
return g.idMapper.IdentityInfo()
610+
}
611+
607612
// SendByCriteria sends a given message to all peers that match the given SendCriteria
608613
func (g *gossipServiceImpl) SendByCriteria(msg *proto.SignedGossipMessage, criteria SendCriteria) error {
609614
if criteria.Timeout == 0 {

gossip/gossip/gossip_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ type naiveCryptoService struct {
126126
revokedPkiIDS map[string]struct{}
127127
}
128128

129+
func (cs *naiveCryptoService) OrgByPeerIdentity(api.PeerIdentityType) api.OrgIdentityType {
130+
return nil
131+
}
132+
129133
func (*naiveCryptoService) Expiration(peerIdentity api.PeerIdentityType) (time.Time, error) {
130134
if exp, exists := expirationTimes[string(peerIdentity)]; exists {
131135
return exp, nil

gossip/identity/identity.go

+25-3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type Mapper interface {
4747
// SuspectPeers re-validates all peers that match the given predicate
4848
SuspectPeers(isSuspected api.PeerSuspector)
4949

50+
// IdentityInfo returns information known peer identities
51+
IdentityInfo() api.PeerIdentitySet
52+
5053
// Stop stops all background computations of the Mapper
5154
Stop()
5255
}
@@ -57,6 +60,7 @@ type purgeTrigger func(pkiID common.PKIidType, identity api.PeerIdentityType)
5760
type identityMapperImpl struct {
5861
onPurge purgeTrigger
5962
mcs api.MessageCryptoService
63+
sa api.SecurityAdvisor
6064
pkiID2Cert map[string]*storedIdentity
6165
sync.RWMutex
6266
stopChan chan struct{}
@@ -65,14 +69,15 @@ type identityMapperImpl struct {
6569
}
6670

6771
// NewIdentityMapper method, all we need is a reference to a MessageCryptoService
68-
func NewIdentityMapper(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, onPurge purgeTrigger) Mapper {
72+
func NewIdentityMapper(mcs api.MessageCryptoService, selfIdentity api.PeerIdentityType, onPurge purgeTrigger, sa api.SecurityAdvisor) Mapper {
6973
selfPKIID := mcs.GetPKIidOfCert(selfIdentity)
7074
idMapper := &identityMapperImpl{
7175
onPurge: onPurge,
7276
mcs: mcs,
7377
pkiID2Cert: make(map[string]*storedIdentity),
7478
stopChan: make(chan struct{}),
7579
selfPKIID: string(selfPKIID),
80+
sa: sa,
7681
}
7782
if err := idMapper.Put(selfPKIID, selfIdentity); err != nil {
7883
panic(errors.Wrap(err, "Failed putting our own identity into the identity mapper"))
@@ -138,7 +143,7 @@ func (is *identityMapperImpl) Put(pkiID common.PKIidType, identity api.PeerIdent
138143
})
139144
}
140145

141-
is.pkiID2Cert[string(id)] = newStoredIdentity(pkiID, identity, expirationTimer)
146+
is.pkiID2Cert[string(id)] = newStoredIdentity(pkiID, identity, expirationTimer, is.sa.OrgByPeerIdentity(identity))
142147
return nil
143148
}
144149

@@ -210,6 +215,21 @@ func (is *identityMapperImpl) validateIdentities(isSuspected api.PeerSuspector)
210215
return revokedIdentities
211216
}
212217

218+
// IdentityInfo returns information known peer identities
219+
func (is *identityMapperImpl) IdentityInfo() api.PeerIdentitySet {
220+
var res api.PeerIdentitySet
221+
is.RLock()
222+
defer is.RUnlock()
223+
for _, storedIdentity := range is.pkiID2Cert {
224+
res = append(res, api.PeerIdentityInfo{
225+
Identity: storedIdentity.peerIdentity,
226+
PKIId: storedIdentity.pkiID,
227+
Organization: storedIdentity.orgId,
228+
})
229+
}
230+
return res
231+
}
232+
213233
func (is *identityMapperImpl) delete(pkiID common.PKIidType, identity api.PeerIdentityType) {
214234
is.Lock()
215235
defer is.Unlock()
@@ -221,15 +241,17 @@ type storedIdentity struct {
221241
pkiID common.PKIidType
222242
lastAccessTime int64
223243
peerIdentity api.PeerIdentityType
244+
orgId api.OrgIdentityType
224245
expirationTimer *time.Timer
225246
}
226247

227-
func newStoredIdentity(pkiID common.PKIidType, identity api.PeerIdentityType, expirationTimer *time.Timer) *storedIdentity {
248+
func newStoredIdentity(pkiID common.PKIidType, identity api.PeerIdentityType, expirationTimer *time.Timer, org api.OrgIdentityType) *storedIdentity {
228249
return &storedIdentity{
229250
pkiID: pkiID,
230251
lastAccessTime: time.Now().UnixNano(),
231252
peerIdentity: identity,
232253
expirationTimer: expirationTimer,
254+
orgId: org,
233255
}
234256
}
235257

gossip/identity/identity_test.go

+44-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"testing"
1414
"time"
1515

16+
"strings"
17+
1618
"github.com/hyperledger/fabric/gossip/api"
1719
"github.com/hyperledger/fabric/gossip/common"
1820
"github.com/hyperledger/fabric/gossip/util"
@@ -40,6 +42,19 @@ func init() {
4042
msgCryptoService.On("Expiration", api.PeerIdentityType("invalidIdentity")).Return(time.Now().Add(time.Hour), nil)
4143
}
4244

45+
func (cs *naiveCryptoService) OrgByPeerIdentity(id api.PeerIdentityType) api.OrgIdentityType {
46+
found := false
47+
for _, call := range cs.Mock.ExpectedCalls {
48+
if call.Method == "OrgByPeerIdentity" {
49+
found = true
50+
}
51+
}
52+
if !found {
53+
return nil
54+
}
55+
return cs.Called(id).Get(0).(api.OrgIdentityType)
56+
}
57+
4358
func (cs *naiveCryptoService) Expiration(peerIdentity api.PeerIdentityType) (time.Time, error) {
4459
args := cs.Called(peerIdentity)
4560
t, err := args.Get(0), args.Get(1)
@@ -91,7 +106,7 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature,
91106
}
92107

93108
func TestPut(t *testing.T) {
94-
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
109+
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger, msgCryptoService)
95110
identity := []byte("yacovm")
96111
identity2 := []byte("not-yacovm")
97112
identity3 := []byte("invalidIdentity")
@@ -109,7 +124,7 @@ func TestPut(t *testing.T) {
109124
}
110125

111126
func TestGet(t *testing.T) {
112-
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
127+
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger, msgCryptoService)
113128
identity := []byte("yacovm")
114129
identity2 := []byte("not-yacovm")
115130
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
@@ -124,7 +139,7 @@ func TestGet(t *testing.T) {
124139
}
125140

126141
func TestVerify(t *testing.T) {
127-
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger)
142+
idStore := NewIdentityMapper(msgCryptoService, dummyID, noopPurgeTrigger, msgCryptoService)
128143
identity := []byte("yacovm")
129144
identity2 := []byte("not-yacovm")
130145
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
@@ -152,7 +167,7 @@ func TestListInvalidIdentities(t *testing.T) {
152167
selfPKIID := msgCryptoService.GetPKIidOfCert(dummyID)
153168
idStore := NewIdentityMapper(msgCryptoService, dummyID, func(_ common.PKIidType, identity api.PeerIdentityType) {
154169
deletedIdentities <- string(identity)
155-
})
170+
}, msgCryptoService)
156171
identity := []byte("yacovm")
157172
// Test for a revoked identity
158173
pkiID := msgCryptoService.GetPKIidOfCert(api.PeerIdentityType(identity))
@@ -228,7 +243,7 @@ func TestExpiration(t *testing.T) {
228243
SetIdentityUsageThreshold(time.Second * 500)
229244
idStore := NewIdentityMapper(msgCryptoService, dummyID, func(_ common.PKIidType, identity api.PeerIdentityType) {
230245
deletedIdentities <- string(identity)
231-
})
246+
}, msgCryptoService)
232247
assertDeletedIdentity := func(expected string) {
233248
select {
234249
case <-time.After(time.Second * 10):
@@ -296,6 +311,29 @@ func TestExpirationPanic(t *testing.T) {
296311
identity3 := []byte("invalidIdentity")
297312
msgCryptoService.revokedIdentities[string(identity3)] = struct{}{}
298313
assert.Panics(t, func() {
299-
NewIdentityMapper(msgCryptoService, identity3, noopPurgeTrigger)
314+
NewIdentityMapper(msgCryptoService, identity3, noopPurgeTrigger, msgCryptoService)
300315
})
301316
}
317+
318+
func TestIdentityInfo(t *testing.T) {
319+
cs := &naiveCryptoService{}
320+
alice := api.PeerIdentityType("alicePeer")
321+
bob := api.PeerIdentityType("bobPeer")
322+
aliceID := cs.GetPKIidOfCert(alice)
323+
bobId := cs.GetPKIidOfCert(bob)
324+
cs.On("OrgByPeerIdentity", dummyID).Return(api.OrgIdentityType("D"))
325+
cs.On("OrgByPeerIdentity", alice).Return(api.OrgIdentityType("A"))
326+
cs.On("OrgByPeerIdentity", bob).Return(api.OrgIdentityType("B"))
327+
cs.On("Expiration", mock.Anything).Return(time.Now().Add(time.Minute), nil)
328+
idStore := NewIdentityMapper(cs, dummyID, noopPurgeTrigger, cs)
329+
idStore.Put(aliceID, alice)
330+
idStore.Put(bobId, bob)
331+
for org, id := range idStore.IdentityInfo().ByOrg() {
332+
identity := string(id[0].Identity)
333+
pkiID := string(id[0].PKIId)
334+
orgId := string(id[0].Organization)
335+
assert.Equal(t, org, orgId)
336+
assert.Equal(t, strings.ToLower(org), string(identity[0]))
337+
assert.Equal(t, strings.ToLower(org), string(pkiID[0]))
338+
}
339+
}

gossip/service/join_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (g *gossipMock) LeaveChan(chainID common.ChainID) {
9292
panic("implement me")
9393
}
9494

95+
func (g *gossipMock) IdentityInfo() api.PeerIdentitySet {
96+
panic("implement me")
97+
}
98+
9599
func (*gossipMock) Stop() {
96100
panic("implement me")
97101
}

gossip/state/mocks/gossip.go

+5
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (
7777
func (g *GossipMock) JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID) {
7878
}
7979

80+
// IdentityInfo returns information known peer identities
81+
func (g *GossipMock) IdentityInfo() api.PeerIdentitySet {
82+
panic("not implemented")
83+
}
84+
8085
func (g *GossipMock) Stop() {
8186

8287
}

0 commit comments

Comments
 (0)