Skip to content

Commit f75abd6

Browse files
committedMar 19, 2018
[FAB-7554] Include Envelopes in gossip membership API
The discovery service needs to provide clients with envelopes of peers so that the clients could (optionally) verify the signatures themselves if they desire. The discovery.NetworkMember already has an envelope from previous change sets, and we need to populate the field when we return the envelopes in the discovery and channel modules. Change-Id: If582553c4e19d3403c3f8728b3a6fb5a3f8e27ef Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent f4007a4 commit f75abd6

File tree

8 files changed

+120
-20
lines changed

8 files changed

+120
-20
lines changed
 

‎gossip/discovery/discovery_impl.go

+26-15
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, intern
410410
InternalEndpoint: internalEndpoint,
411411
}
412412

413-
aliveMsg, err := d.createAliveMessage(true)
413+
aliveMsg, err := d.createSignedAliveMessage(true)
414414
if err != nil {
415415
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
416416
return
@@ -624,7 +624,7 @@ func (d *gossipDiscoveryImpl) sendMembershipRequest(member *NetworkMember, inclu
624624
}
625625

626626
func (d *gossipDiscoveryImpl) createMembershipRequest(includeInternalEndpoint bool) (*proto.GossipMessage, error) {
627-
am, err := d.createAliveMessage(includeInternalEndpoint)
627+
am, err := d.createSignedAliveMessage(includeInternalEndpoint)
628628
if err != nil {
629629
return nil, errors.WithStack(err)
630630
}
@@ -723,7 +723,7 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
723723
for !d.toDie() {
724724
d.logger.Debug("Sleeping", getAliveTimeInterval())
725725
time.Sleep(getAliveTimeInterval())
726-
msg, err := d.createAliveMessage(true)
726+
msg, err := d.createSignedAliveMessage(true)
727727
if err != nil {
728728
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
729729
return
@@ -732,19 +732,16 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
732732
}
733733
}
734734

735-
func (d *gossipDiscoveryImpl) createAliveMessage(includeInternalEndpoint bool) (*proto.SignedGossipMessage, error) {
735+
func (d *gossipDiscoveryImpl) aliveMsgAndInternalEndpoint() (*proto.GossipMessage, string) {
736736
d.lock.Lock()
737+
defer d.lock.Unlock()
737738
d.seqNum++
738739
seqNum := d.seqNum
739-
740740
endpoint := d.self.Endpoint
741741
meta := d.self.Metadata
742742
pkiID := d.self.PKIid
743743
internalEndpoint := d.self.InternalEndpoint
744-
745-
d.lock.Unlock()
746-
747-
msg2Gossip := &proto.GossipMessage{
744+
msg := &proto.GossipMessage{
748745
Tag: proto.GossipMessage_EMPTY,
749746
Content: &proto.GossipMessage_AliveMsg{
750747
AliveMsg: &proto.AliveMessage{
@@ -760,13 +757,17 @@ func (d *gossipDiscoveryImpl) createAliveMessage(includeInternalEndpoint bool) (
760757
},
761758
},
762759
}
760+
return msg, internalEndpoint
761+
}
763762

764-
envp := d.crypt.SignMessage(msg2Gossip, internalEndpoint)
763+
func (d *gossipDiscoveryImpl) createSignedAliveMessage(includeInternalEndpoint bool) (*proto.SignedGossipMessage, error) {
764+
msg, internalEndpoint := d.aliveMsgAndInternalEndpoint()
765+
envp := d.crypt.SignMessage(msg, internalEndpoint)
765766
if envp == nil {
766767
return nil, errors.New("Failed signing message")
767768
}
768769
signedMsg := &proto.SignedGossipMessage{
769-
GossipMessage: msg2Gossip,
770+
GossipMessage: msg,
770771
Envelope: envp,
771772
}
772773

@@ -913,6 +914,7 @@ func (d *gossipDiscoveryImpl) GetMembership() []NetworkMember {
913914
Endpoint: member.Membership.Endpoint,
914915
Metadata: member.Membership.Metadata,
915916
InternalEndpoint: d.id2Member[string(m.GetAliveMsg().Membership.PkiId)].InternalEndpoint,
917+
Envelope: m.Envelope,
916918
})
917919
}
918920
return response
@@ -937,11 +939,20 @@ func (d *gossipDiscoveryImpl) UpdateEndpoint(endpoint string) {
937939
}
938940

939941
func (d *gossipDiscoveryImpl) Self() NetworkMember {
942+
var env *proto.Envelope
943+
msg, _ := d.aliveMsgAndInternalEndpoint()
944+
sMsg, err := msg.NoopSign()
945+
if err != nil {
946+
d.logger.Warning("Failed creating SignedGossipMessage:", err)
947+
} else {
948+
env = sMsg.Envelope
949+
}
950+
mem := msg.GetAliveMsg().Membership
940951
return NetworkMember{
941-
Endpoint: d.self.Endpoint,
942-
Metadata: d.self.Metadata,
943-
PKIid: d.self.PKIid,
944-
InternalEndpoint: d.self.InternalEndpoint,
952+
Endpoint: mem.Endpoint,
953+
Metadata: mem.Metadata,
954+
PKIid: mem.PkiId,
955+
Envelope: env,
945956
}
946957
}
947958

‎gossip/discovery/discovery_test.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,21 @@ func TestInitiateSync(t *testing.T) {
545545
stopInstances(t, instances)
546546
}
547547

548+
func TestSelf(t *testing.T) {
549+
t.Parallel()
550+
inst := createDiscoveryInstance(13463, "d1", []string{})
551+
defer inst.Stop()
552+
env := inst.Self().Envelope
553+
sMsg, err := env.ToGossipMessage()
554+
assert.NoError(t, err)
555+
member := sMsg.GetAliveMsg().Membership
556+
assert.Equal(t, "localhost:13463", member.Endpoint)
557+
assert.Equal(t, []byte("localhost:13463"), member.PkiId)
558+
559+
assert.Equal(t, "localhost:13463", inst.Self().Endpoint)
560+
assert.Equal(t, common.PKIidType("localhost:13463"), inst.Self().PKIid)
561+
}
562+
548563
func TestExpiration(t *testing.T) {
549564
t.Parallel()
550565
nodeNum := 5
@@ -974,7 +989,7 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
974989
}
975990
// Creating Alive messages
976991
for i := 0; i < peersNum; i++ {
977-
aliveMsg, _ := instances[i].discoveryImpl().createAliveMessage(true)
992+
aliveMsg, _ := instances[i].discoveryImpl().createSignedAliveMessage(true)
978993
aliveMsgs = append(aliveMsgs, aliveMsg)
979994
}
980995

@@ -1042,15 +1057,15 @@ func TestMsgStoreExpirationWithMembershipMessages(t *testing.T) {
10421057
return k == i
10431058
},
10441059
func(k int) {
1045-
aliveMsg, _ := instances[k].discoveryImpl().createAliveMessage(true)
1060+
aliveMsg, _ := instances[k].discoveryImpl().createSignedAliveMessage(true)
10461061
memResp := instances[k].discoveryImpl().createMembershipResponse(aliveMsg, peerToResponse)
10471062
memRespMsgs[i] = append(memRespMsgs[i], memResp)
10481063
})
10491064
}
10501065

10511066
// Re-creating Alive msgs with highest seq_num, to make sure Alive msgs in memReq and memResp are older
10521067
for i := 0; i < peersNum; i++ {
1053-
aliveMsg, _ := instances[i].discoveryImpl().createAliveMessage(true)
1068+
aliveMsg, _ := instances[i].discoveryImpl().createSignedAliveMessage(true)
10541069
newAliveMsgs = append(newAliveMsgs, aliveMsg)
10551070
}
10561071

@@ -1183,7 +1198,7 @@ func TestAliveMsgStore(t *testing.T) {
11831198
}
11841199
// Creating Alive messages
11851200
for i := 0; i < peersNum; i++ {
1186-
aliveMsg, _ := instances[i].discoveryImpl().createAliveMessage(true)
1201+
aliveMsg, _ := instances[i].discoveryImpl().createSignedAliveMessage(true)
11871202
aliveMsgs = append(aliveMsgs, aliveMsg)
11881203
}
11891204

‎gossip/gossip/channel/channel.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type Config struct {
4545
// GossipChannel defines an object that deals with all channel-related messages
4646
type GossipChannel interface {
4747

48+
// Self returns a StateInfoMessage about the peer
49+
Self() *proto.SignedGossipMessage
50+
4851
// GetPeers returns a list of peers with metadata as published by them
4952
GetPeers() []discovery.NetworkMember
5053

@@ -287,6 +290,13 @@ func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
287290
}
288291
}
289292

293+
// Self returns a StateInfoMessage about the peer
294+
func (gc *gossipChannel) Self() *proto.SignedGossipMessage {
295+
gc.RLock()
296+
defer gc.RUnlock()
297+
return gc.stateInfoMsg
298+
}
299+
290300
// LeaveChannel makes the peer leave the channel
291301
func (gc *gossipChannel) LeaveChannel() {
292302
gc.Lock()
@@ -309,7 +319,7 @@ func (gc *gossipChannel) hasLeftChannel() bool {
309319

310320
// GetPeers returns a list of peers with metadata as published by them
311321
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
312-
members := []discovery.NetworkMember{}
322+
var members []discovery.NetworkMember
313323
if gc.hasLeftChannel() {
314324
return members
315325
}
@@ -327,6 +337,7 @@ func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
327337
continue
328338
}
329339
member.Properties = stateInf.GetStateInfo().Properties
340+
member.Envelope = stateInf.Envelope
330341
members = append(members, member)
331342
}
332343
return members

‎gossip/gossip/channel/channel_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,29 @@ func TestBadInput(t *testing.T) {
282282
assert.False(t, gc.verifyMsg(&receivedMsg{msg: nil, PKIID: nil}))
283283
}
284284

285+
func TestSelf(t *testing.T) {
286+
t.Parallel()
287+
288+
cs := &cryptoService{}
289+
pkiID1 := common.PKIidType("1")
290+
jcm := &joinChanMsg{
291+
members2AnchorPeers: map[string][]api.AnchorPeer{
292+
string(orgInChannelA): {},
293+
},
294+
}
295+
adapter := new(gossipAdapterMock)
296+
configureAdapter(adapter)
297+
adapter.On("Gossip", mock.Anything)
298+
gc := NewGossipChannel(pkiID1, orgInChannelA, cs, channelA, adapter, jcm)
299+
gc.UpdateLedgerHeight(1)
300+
gMsg := gc.Self().GossipMessage
301+
env := gc.Self().Envelope
302+
sMsg, _ := env.ToGossipMessage()
303+
assert.Equal(t, gMsg, sMsg.GossipMessage)
304+
assert.Equal(t, gMsg.GetStateInfo().Properties.LedgerHeight, uint64(1))
305+
assert.Equal(t, gMsg.GetStateInfo().PkiId, []byte("1"))
306+
}
307+
285308
func TestMsgStoreNotExpire(t *testing.T) {
286309
t.Parallel()
287310

@@ -1660,6 +1683,10 @@ func TestChannelGetPeers(t *testing.T) {
16601683
assert.Len(t, gc.GetPeers(), 1)
16611684
assert.Equal(t, pkiIDInOrg1, gc.GetPeers()[0].PKIid)
16621685

1686+
// Ensure envelope from GetPeers is valid
1687+
gMsg, _ := gc.GetPeers()[0].Envelope.ToGossipMessage()
1688+
assert.Equal(t, []byte(pkiIDInOrg1), gMsg.GetStateInfo().PkiId)
1689+
16631690
gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1ButNotEligible, channelA), PKIID: pkiIDInOrg1ButNotEligible})
16641691
cs.On("VerifyByChannel", mock.Anything).Return(errors.New("Not eligible"))
16651692
cs.mocked = true

‎gossip/gossip/gossip.go

+6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import (
2121
// Gossip is the interface of the gossip component
2222
type Gossip interface {
2323

24+
// SelfMembershipInfo returns the peer's membership information
25+
SelfMembershipInfo() discovery.NetworkMember
26+
27+
// SelfChannelInfo returns the peer's latest StateInfo message of a given channel
28+
SelfChannelInfo(common.ChainID) *proto.SignedGossipMessage
29+
2430
// Send sends a message to remote peers
2531
Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer)
2632

‎gossip/gossip/gossip_impl.go

+14
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,20 @@ func (g *gossipServiceImpl) PeersOfChannel(channel common.ChainID) []discovery.N
727727
return gc.GetPeers()
728728
}
729729

730+
// SelfMembershipInfo returns the peer's membership information
731+
func (g *gossipServiceImpl) SelfMembershipInfo() discovery.NetworkMember {
732+
return g.disc.Self()
733+
}
734+
735+
// SelfChannelInfo returns the peer's latest StateInfo message of a given channel
736+
func (g *gossipServiceImpl) SelfChannelInfo(chain common.ChainID) *proto.SignedGossipMessage {
737+
ch := g.chanState.getGossipChannelByChainID(chain)
738+
if ch == nil {
739+
return nil
740+
}
741+
return ch.Self()
742+
}
743+
730744
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
731745
// only peer identities that match the given criteria, and that they published their channel participation
732746
func (g *gossipServiceImpl) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {

‎gossip/service/join_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ type gossipMock struct {
4040
mock.Mock
4141
}
4242

43+
func (g *gossipMock) SelfChannelInfo(common.ChainID) *proto.SignedGossipMessage {
44+
panic("implement me")
45+
}
46+
47+
func (g *gossipMock) SelfMembershipInfo() discovery.NetworkMember {
48+
panic("implement me")
49+
}
50+
4351
func (*gossipMock) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
4452
panic("implement me")
4553
}

‎gossip/state/mocks/gossip.go

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ type GossipMock struct {
2121
mock.Mock
2222
}
2323

24+
func (g *GossipMock) SelfMembershipInfo() discovery.NetworkMember {
25+
panic("implement me")
26+
}
27+
28+
func (g *GossipMock) SelfChannelInfo(common.ChainID) *proto.SignedGossipMessage {
29+
panic("implement me")
30+
}
31+
2432
func (*GossipMock) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
2533
panic("implement me")
2634
}

0 commit comments

Comments
 (0)
Please sign in to comment.