Skip to content

Commit a7f948d

Browse files
committed
[FAB-9443] Enhance discovery client expressiveness
This change set enhances the discovery client reference API with abilities that enable the user more control over how the endorsers are selected, i.e: - Prefer endorsers with a higher ledger height - Exclude certain peers from being chosen, which is useful for cases where the SDK detected an unreachable peer and when it requests from the discovery client object an endorser set it would want the discovery service to skip selecting that peer. Also, this change set: - Adds input validation to StateInfo and AliveMessages returned from peers - Fixes golint warnings Change-Id: Ib8de21c91ecf7706ab42ec898e00d0e2f40af8be Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 968d12b commit a7f948d

File tree

5 files changed

+484
-40
lines changed

5 files changed

+484
-40
lines changed

discovery/client/api.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/hyperledger/fabric/protos/discovery"
1111
"github.com/hyperledger/fabric/protos/gossip"
1212
"github.com/pkg/errors"
13-
"golang.org/x/net/context"
1413
"google.golang.org/grpc"
1514
)
1615

@@ -26,12 +25,6 @@ type Signer func(msg []byte) ([]byte, error)
2625
// Dialer connects to the server
2726
type Dialer func() (*grpc.ClientConn, error)
2827

29-
// Client defines the client-side API of the discovery service
30-
type Client interface {
31-
// Send sends the Request and returns the response, or error on failure
32-
Send(context.Context, *Request) (Response, error)
33-
}
34-
3528
// Response aggregates several responses from the discovery service
3629
type Response interface {
3730
// ForChannel returns a ChannelResponse in the context of a given channel
@@ -50,7 +43,8 @@ type ChannelResponse interface {
5043
// chaincode in a given channel context, or error if something went wrong.
5144
// The method returns a random set of endorsers, such that signatures from all of them
5245
// combined, satisfy the endorsement policy.
53-
Endorsers(string) (Endorsers, error)
46+
// The selection is based on the given selection hint
47+
Endorsers(cc string, hint Selector) (Endorsers, error)
5448
}
5549

5650
// Endorsers defines a set of peers that are sufficient

discovery/client/client.go

+74-24
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"time"
1414

1515
"github.com/golang/protobuf/proto"
16-
"github.com/hyperledger/fabric/gossip/util"
1716
"github.com/hyperledger/fabric/protos/discovery"
17+
"github.com/hyperledger/fabric/protos/gossip"
1818
"github.com/hyperledger/fabric/protos/msp"
1919
"github.com/pkg/errors"
2020
)
@@ -23,7 +23,8 @@ var (
2323
configTypes = []discovery.QueryType{discovery.ConfigQueryType, discovery.PeerMembershipQueryType, discovery.ChaincodeQueryType}
2424
)
2525

26-
type client struct {
26+
// Client interacts with the discovery server
27+
type Client struct {
2728
lastRequest []byte
2829
lastSignature []byte
2930
createConnection Dialer
@@ -109,7 +110,7 @@ func (req *Request) addQueryMapping(queryType discovery.QueryType, key string) {
109110
}
110111

111112
// Send sends the request and returns the response, or error on failure
112-
func (c *client) Send(ctx context.Context, req *Request) (Response, error) {
113+
func (c *Client) Send(ctx context.Context, req *Request) (Response, error) {
113114
req.Authentication = c.authInfo
114115
payload, err := proto.Marshal(req.Request)
115116
if err != nil {
@@ -119,7 +120,7 @@ func (c *client) Send(ctx context.Context, req *Request) (Response, error) {
119120
sig := c.lastSignature
120121
// Only sign the Request if it is different than the previous Request sent.
121122
// Otherwise, use the last signature from the previous send.
122-
// This is not only to save CPU cycles in the client-side,
123+
// This is not only to save CPU cycles in the Client-side,
123124
// but also for the server side to be able to memoize the signature verification.
124125
// We have the use the previous signature, because many signature schemes are not deterministic.
125126
if !bytes.Equal(c.lastRequest, payload) {
@@ -201,7 +202,7 @@ func (cr *channelResponse) Peers() ([]*Peer, error) {
201202
return nil, res.(error)
202203
}
203204

204-
func (cr *channelResponse) Endorsers(cc string) (Endorsers, error) {
205+
func (cr *channelResponse) Endorsers(cc string, s Selector) (Endorsers, error) {
205206
// If we have a key that has no chaincode field,
206207
// it means it's an error returned from the service
207208
if err, exists := cr.response[key{
@@ -224,18 +225,33 @@ func (cr *channelResponse) Endorsers(cc string) (Endorsers, error) {
224225

225226
desc := res.(*endorsementDescriptor)
226227
rand.Seed(time.Now().Unix())
227-
randomLayoutIndex := rand.Intn(len(desc.layouts))
228-
layout := desc.layouts[randomLayoutIndex]
228+
// We iterate over all layouts to find one that we have enough peers to select
229+
for _, index := range rand.Perm(len(desc.layouts)) {
230+
layout := desc.layouts[index]
231+
endorsers, canLayoutBeSatisfied := selectPeersForLayout(desc.endorsersByGroups, layout, s)
232+
if canLayoutBeSatisfied {
233+
return endorsers, nil
234+
}
235+
}
236+
return nil, errors.New("no endorsement combination can be satisfied")
237+
}
238+
239+
func selectPeersForLayout(endorsersByGroups map[string][]*Peer, layout map[string]int, s Selector) (Endorsers, bool) {
229240
var endorsers []*Peer
230241
for grp, count := range layout {
231-
endorsersOfGrp := randomEndorsers(count, desc.endorsersByGroups[grp])
242+
shuffledEndorsers := Endorsers(endorsersByGroups[grp]).Shuffle()
243+
endorsersOfGrp := s.Select(shuffledEndorsers)
244+
// We couldn't select enough peers for this layout because the current group
245+
// requires more peers than we have available to be selected
232246
if len(endorsersOfGrp) < count {
233-
return nil, errors.Errorf("layout has a group that requires at least %d peers, but only %d peers are known", count, len(endorsersOfGrp))
247+
return nil, false
234248
}
249+
endorsersOfGrp = endorsersOfGrp[:count]
235250
endorsers = append(endorsers, endorsersOfGrp...)
236251
}
237-
238-
return endorsers, nil
252+
// The current (randomly chosen) layout can be satisfied, so return it
253+
// instead of checking the next one.
254+
return endorsers, true
239255
}
240256

241257
func (resp response) ForChannel(ch string) ChannelResponse {
@@ -330,6 +346,12 @@ func peersForChannel(membersRes *discovery.PeerMembershipResult) ([]*Peer, error
330346
if err != nil {
331347
return nil, errors.Wrap(err, "failed unmarshaling stateInfo message")
332348
}
349+
if err := validateAliveMessage(aliveMsg); err != nil {
350+
return nil, errors.Wrap(err, "failed validating alive message")
351+
}
352+
if err := validateStateInfoMessage(stateInfoMsg); err != nil {
353+
return nil, errors.Wrap(err, "failed validating stateInfo message")
354+
}
333355
peers = append(peers, &Peer{
334356
MSPID: org,
335357
Identity: peer.Identity,
@@ -426,35 +448,63 @@ func endorser(peer *discovery.Peer, chaincode, channel string) (*Peer, error) {
426448
if err != nil {
427449
return nil, errors.Wrap(err, "failed unmarshaling gossip envelope to state info message")
428450
}
429-
sId := &msp.SerializedIdentity{}
430-
if err := proto.Unmarshal(peer.Identity, sId); err != nil {
451+
if err := validateAliveMessage(aliveMsg); err != nil {
452+
return nil, errors.Wrap(err, "failed validating alive message")
453+
}
454+
if err := validateStateInfoMessage(stateInfMsg); err != nil {
455+
return nil, errors.Wrap(err, "failed validating stateInfo message")
456+
}
457+
sID := &msp.SerializedIdentity{}
458+
if err := proto.Unmarshal(peer.Identity, sID); err != nil {
431459
return nil, errors.Wrap(err, "failed unmarshaling peer's identity")
432460
}
433461
return &Peer{
434462
Identity: peer.Identity,
435463
StateInfoMessage: stateInfMsg,
436464
AliveMessage: aliveMsg,
437-
MSPID: sId.Mspid,
465+
MSPID: sID.Mspid,
438466
}, nil
439467
}
440468

441-
func randomEndorsers(count int, totalPeers []*Peer) Endorsers {
442-
var endorsers []*Peer
443-
for _, index := range util.GetRandomIndices(count, len(totalPeers)-1) {
444-
endorsers = append(endorsers, totalPeers[index])
445-
}
446-
return endorsers
447-
}
448-
449469
type endorsementDescriptor struct {
450470
endorsersByGroups map[string][]*Peer
451471
layouts []map[string]int
452472
}
453473

454-
func NewClient(createConnection Dialer, authInfo *discovery.AuthInfo, s Signer) *client {
455-
return &client{
474+
// NewClient creates a new Client instance
475+
func NewClient(createConnection Dialer, authInfo *discovery.AuthInfo, s Signer) *Client {
476+
return &Client{
456477
createConnection: createConnection,
457478
authInfo: authInfo,
458479
signRequest: s,
459480
}
460481
}
482+
483+
func validateAliveMessage(message *gossip.SignedGossipMessage) error {
484+
am := message.GetAliveMsg()
485+
if am == nil {
486+
return errors.New("message isn't an alive message")
487+
}
488+
m := am.Membership
489+
if m == nil {
490+
return errors.New("membership is empty")
491+
}
492+
if am.Timestamp == nil {
493+
return errors.New("timestamp is nil")
494+
}
495+
return nil
496+
}
497+
498+
func validateStateInfoMessage(message *gossip.SignedGossipMessage) error {
499+
si := message.GetStateInfo()
500+
if si == nil {
501+
return errors.New("message isn't a stateInfo message")
502+
}
503+
if si.Timestamp == nil {
504+
return errors.New("timestamp is nil")
505+
}
506+
if si.Properties == nil {
507+
return errors.New("properties is nil")
508+
}
509+
return nil
510+
}

discovery/client/client_test.go

+63-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"path/filepath"
1616
"strconv"
1717
"testing"
18+
"time"
1819

1920
"github.com/golang/protobuf/proto"
2021
"github.com/hyperledger/fabric/common/cauthdsl"
@@ -307,7 +308,7 @@ func TestClient(t *testing.T) {
307308
assert.Equal(t, ErrNotFound, err)
308309
assert.Nil(t, peers)
309310

310-
endorsers, err := fakeChannel.Endorsers("mycc")
311+
endorsers, err := fakeChannel.Endorsers("mycc", Selector{})
311312
assert.Equal(t, ErrNotFound, err)
312313
assert.Nil(t, endorsers)
313314

@@ -327,7 +328,7 @@ func TestClient(t *testing.T) {
327328
// We should see all peers as provided above
328329
assert.Len(t, peers, 8)
329330

330-
endorsers, err = mychannel.Endorsers("mycc")
331+
endorsers, err = mychannel.Endorsers("mycc", Selector{})
331332
// However, since we didn't provide any chaincodes to these peers - the server shouldn't
332333
// be able to construct the descriptor.
333334
// Just check that the appropriate error is returned, and nothing crashes.
@@ -347,7 +348,7 @@ func TestClient(t *testing.T) {
347348
assert.Len(t, peers, 8)
348349

349350
// We should get a valid endorsement descriptor from the service
350-
endorsers, err = mychannel.Endorsers("mycc")
351+
endorsers, err = mychannel.Endorsers("mycc", Selector{})
351352
assert.NoError(t, err)
352353
// The combinations of endorsers should be in the expected combinations
353354
assert.Contains(t, expectedOrgCombinations, getMSPs(endorsers))
@@ -440,7 +441,7 @@ func TestBadResponses(t *testing.T) {
440441
r, err = cl.Send(ctx, req)
441442
assert.NoError(t, err)
442443
mychannel := r.ForChannel("mychannel")
443-
endorsers, err := mychannel.Endorsers("mycc")
444+
endorsers, err := mychannel.Endorsers("mycc", Selector{})
444445
assert.Nil(t, endorsers)
445446
assert.Equal(t, ErrNotFound, err)
446447

@@ -472,9 +473,9 @@ func TestBadResponses(t *testing.T) {
472473
r, err = cl.Send(ctx, req)
473474
assert.NoError(t, err)
474475
mychannel = r.ForChannel("mychannel")
475-
endorsers, err = mychannel.Endorsers("mycc")
476+
endorsers, err = mychannel.Endorsers("mycc", Selector{})
476477
assert.Nil(t, endorsers)
477-
assert.Contains(t, err.Error(), "layout has a group that requires at least 2 peers, but only 0 peers are known")
478+
assert.Contains(t, err.Error(), "no endorsement combination can be satisfied")
478479

479480
// Scenario VI: discovery service sends back a layout that has a group that doesn't have a matching peer set
480481
svc.On("Discover").Return(&discovery.Response{
@@ -491,6 +492,52 @@ func TestBadResponses(t *testing.T) {
491492
assert.Empty(t, r)
492493
}
493494

495+
func TestValidateAliveMessage(t *testing.T) {
496+
am := aliveMessage(1)
497+
msg, _ := am.ToGossipMessage()
498+
499+
// Scenario I: Valid alive message
500+
assert.NoError(t, validateAliveMessage(msg))
501+
502+
// Scenario II: Nullify timestamp
503+
msg.GetAliveMsg().Timestamp = nil
504+
err := validateAliveMessage(msg)
505+
assert.Equal(t, "timestamp is nil", err.Error())
506+
507+
// Scenario III: Nullify membership
508+
msg.GetAliveMsg().Membership = nil
509+
err = validateAliveMessage(msg)
510+
assert.Equal(t, "membership is empty", err.Error())
511+
512+
// Scenario IV: Nullify the entire alive message part
513+
msg.Content = nil
514+
err = validateAliveMessage(msg)
515+
assert.Equal(t, "message isn't an alive message", err.Error())
516+
517+
}
518+
519+
func TestValidateStateInfoMessage(t *testing.T) {
520+
si := stateInfoWithHeight(100)
521+
522+
// Scenario I: Valid state info message
523+
assert.NoError(t, validateStateInfoMessage(si))
524+
525+
// Scenario II: Nullify properties
526+
si.GetStateInfo().Properties = nil
527+
err := validateStateInfoMessage(si)
528+
assert.Equal(t, "properties is nil", err.Error())
529+
530+
// Scenario III: Nullify timestamp
531+
si.GetStateInfo().Timestamp = nil
532+
err = validateStateInfoMessage(si)
533+
assert.Equal(t, "timestamp is nil", err.Error())
534+
535+
// Scenario IV: Nullify the state info message part
536+
si.Content = nil
537+
err = validateStateInfoMessage(si)
538+
assert.Equal(t, "message isn't a stateInfo message", err.Error())
539+
}
540+
494541
func getMSP(peer *Peer) string {
495542
endpoint := peer.AliveMessage.GetAliveMsg().Membership.Endpoint
496543
id, _ := strconv.ParseInt(endpoint[1:], 10, 64)
@@ -573,11 +620,11 @@ func (ip *inquireablePolicy) SatisfiedBy() []policies.PrincipalSet {
573620

574621
func peerIdentity(mspID string, i int) api.PeerIdentityInfo {
575622
p := []byte(fmt.Sprintf("p%d", i))
576-
sId := &msp.SerializedIdentity{
623+
sID := &msp.SerializedIdentity{
577624
Mspid: mspID,
578625
IdBytes: p,
579626
}
580-
b, _ := proto.Marshal(sId)
627+
b, _ := proto.Marshal(sID)
581628
return api.PeerIdentityInfo{
582629
Identity: api.PeerIdentityType(b),
583630
PKIId: gossipcommon.PKIidType(p),
@@ -595,6 +642,10 @@ func aliveMessage(id int) *gossip.Envelope {
595642
g := &gossip.GossipMessage{
596643
Content: &gossip.GossipMessage_AliveMsg{
597644
AliveMsg: &gossip.AliveMessage{
645+
Timestamp: &gossip.PeerTime{
646+
SeqNum: uint64(id),
647+
IncNum: uint64(time.Now().UnixNano()),
648+
},
598649
Membership: &gossip.Member{
599650
Endpoint: fmt.Sprintf("p%d", id),
600651
},
@@ -609,6 +660,10 @@ func stateInfoMessage(chaincodes ...*gossip.Chaincode) *gossip.Envelope {
609660
g := &gossip.GossipMessage{
610661
Content: &gossip.GossipMessage_StateInfo{
611662
StateInfo: &gossip.StateInfo{
663+
Timestamp: &gossip.PeerTime{
664+
SeqNum: 5,
665+
IncNum: uint64(time.Now().UnixNano()),
666+
},
612667
Properties: &gossip.Properties{
613668
Chaincodes: chaincodes,
614669
},

0 commit comments

Comments
 (0)