Skip to content

Commit 3f1feae

Browse files
committed
[FAB-10366]: Prevent pulling form peer purged data
While seeking for missing private data lagging behind peer could try to pull peaces from the peers which already purged it due to BTL policy. This in particular might be a problem while joining a new peer to the network after a while. This commit adds routing filter to prevent from pulling private data from peers which already purged it, thus not wasting time while committing block. Change-Id: I8ccfc186d69c12e5ae933455640b721454a2e80a Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent 4db57a3 commit 3f1feae

File tree

5 files changed

+341
-114
lines changed

5 files changed

+341
-114
lines changed

gossip/privdata/coordinator.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,17 @@ func (d2s dig2sources) keys() []*gossip2.PvtDataDigest {
104104
return res
105105
}
106106

107+
// FetchedPvtDataContainer container for pvt data elements
108+
// returned by Fetcher
109+
type FetchedPvtDataContainer struct {
110+
AvailableElemenets []*gossip2.PvtDataElement
111+
PurgedElements []*gossip2.PvtDataDigest
112+
}
113+
107114
// Fetcher interface which defines API to fetch missing
108115
// private data elements
109116
type Fetcher interface {
110-
fetch(dig2src dig2sources) ([]*gossip2.PvtDataElement, error)
117+
fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataContainer, error)
111118
}
112119

113120
// Support encapsulates set of interfaces to
@@ -262,14 +269,14 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][
262269
}
263270
dig2src[dig] = privateInfo.sources[k]
264271
})
265-
fetchedData, err := c.fetch(dig2src)
272+
fetchedData, err := c.fetch(dig2src, blockSeq)
266273
if err != nil {
267274
logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err)
268275
return
269276
}
270277

271278
// Iterate over data fetched from peers
272-
for _, element := range fetchedData {
279+
for _, element := range fetchedData.AvailableElemenets {
273280
dig := element.Digest
274281
for _, rws := range element.Payload {
275282
hash := hex.EncodeToString(util2.ComputeSHA256(rws))
@@ -292,6 +299,18 @@ func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][
292299
logger.Debug("Fetched", key)
293300
}
294301
}
302+
// Iterate over purged data
303+
for _, dig := range fetchedData.PurgedElements {
304+
// delete purged key from missing keys
305+
for missingPvtRWKey := range privateInfo.missingKeys {
306+
if missingPvtRWKey.namespace == dig.Namespace &&
307+
missingPvtRWKey.collection == dig.Collection &&
308+
missingPvtRWKey.txID == dig.TxId {
309+
delete(privateInfo.missingKeys, missingPvtRWKey)
310+
logger.Debug(missingPvtRWKey, "was purged or will soon be purged, skipping fetch")
311+
}
312+
}
313+
}
295314
}
296315

297316
func (c *coordinator) fetchMissingFromTransientStore(missing rwSetKeysByTxIDs, ownedRWsets map[rwSetKey][]byte) {

gossip/privdata/coordinator_test.go

+51-44
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,7 @@ func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall
293293
}
294294
}
295295

296-
func (f *fetcherMock) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
297-
fmt.Println("XXX: Expected endorsers", f.expectedEndorsers)
296+
func (f *fetcherMock) fetch(dig2src dig2sources, _ uint64) (*FetchedPvtDataContainer, error) {
298297
for _, endorsements := range dig2src {
299298
for _, endorsement := range endorsements {
300299
_, exists := f.expectedEndorsers[string(endorsement.Endorser)]
@@ -309,7 +308,7 @@ func (f *fetcherMock) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error
309308
assert.Empty(f.t, f.expectedEndorsers)
310309
args := f.Called(dig2src)
311310
if args.Get(1) == nil {
312-
return args.Get(0).([]*proto.PvtDataElement), nil
311+
return args.Get(0).(*FetchedPvtDataContainer), nil
313312
}
314313
return nil, args.Get(1).(error)
315314
}
@@ -821,15 +820,17 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
821820
{
822821
TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1,
823822
},
824-
}).expectingEndorsers("org1").Return([]*proto.PvtDataElement{
825-
{
826-
Digest: &proto.PvtDataDigest{
827-
BlockSeq: 1,
828-
Collection: "c1",
829-
Namespace: "ns1",
830-
TxId: "tx1",
823+
}).expectingEndorsers("org1").Return(&FetchedPvtDataContainer{
824+
AvailableElemenets: []*proto.PvtDataElement{
825+
{
826+
Digest: &proto.PvtDataDigest{
827+
BlockSeq: 1,
828+
Collection: "c1",
829+
Namespace: "ns1",
830+
TxId: "tx1",
831+
},
832+
Payload: [][]byte{[]byte("rws-original")},
831833
},
832-
Payload: [][]byte{[]byte("rws-original")},
833834
},
834835
}, nil)
835836
store.On("Persist", mock.Anything, uint64(1), mock.Anything).
@@ -945,25 +946,27 @@ func TestCoordinatorStoreBlock(t *testing.T) {
945946
{
946947
TxId: "tx2", Namespace: "ns2", Collection: "c1", BlockSeq: 1, SeqInBlock: 1,
947948
},
948-
}).expectingEndorsers("org1").Return([]*proto.PvtDataElement{
949-
{
950-
Digest: &proto.PvtDataDigest{
951-
BlockSeq: 1,
952-
Collection: "c2",
953-
Namespace: "ns1",
954-
TxId: "tx1",
949+
}).expectingEndorsers("org1").Return(&FetchedPvtDataContainer{
950+
AvailableElemenets: []*proto.PvtDataElement{
951+
{
952+
Digest: &proto.PvtDataDigest{
953+
BlockSeq: 1,
954+
Collection: "c2",
955+
Namespace: "ns1",
956+
TxId: "tx1",
957+
},
958+
Payload: [][]byte{[]byte("rws-pre-image")},
955959
},
956-
Payload: [][]byte{[]byte("rws-pre-image")},
957-
},
958-
{
959-
Digest: &proto.PvtDataDigest{
960-
SeqInBlock: 1,
961-
BlockSeq: 1,
962-
Collection: "c1",
963-
Namespace: "ns2",
964-
TxId: "tx2",
960+
{
961+
Digest: &proto.PvtDataDigest{
962+
SeqInBlock: 1,
963+
BlockSeq: 1,
964+
Collection: "c1",
965+
Namespace: "ns2",
966+
TxId: "tx2",
967+
},
968+
Payload: [][]byte{[]byte("rws-pre-image")},
965969
},
966-
Payload: [][]byte{[]byte("rws-pre-image")},
967970
},
968971
}, nil)
969972
store.On("Persist", mock.Anything, uint64(1), mock.Anything).
@@ -994,15 +997,17 @@ func TestCoordinatorStoreBlock(t *testing.T) {
994997
{
995998
TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1,
996999
},
997-
}).Return([]*proto.PvtDataElement{
998-
{
999-
Digest: &proto.PvtDataDigest{
1000-
BlockSeq: 1,
1001-
Collection: "c3",
1002-
Namespace: "ns3",
1003-
TxId: "tx3",
1000+
}).Return(&FetchedPvtDataContainer{
1001+
AvailableElemenets: []*proto.PvtDataElement{
1002+
{
1003+
Digest: &proto.PvtDataDigest{
1004+
BlockSeq: 1,
1005+
Collection: "c3",
1006+
Namespace: "ns3",
1007+
TxId: "tx3",
1008+
},
1009+
Payload: [][]byte{[]byte("rws-pre-image")},
10041010
},
1005-
Payload: [][]byte{[]byte("rws-pre-image")},
10061011
},
10071012
}, nil)
10081013
store = &mockTransientStore{t: t}
@@ -1123,15 +1128,17 @@ func TestProceedWithoutPrivateData(t *testing.T) {
11231128
{
11241129
TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1,
11251130
},
1126-
}).Return([]*proto.PvtDataElement{
1127-
{
1128-
Digest: &proto.PvtDataDigest{
1129-
BlockSeq: 1,
1130-
Collection: "c2",
1131-
Namespace: "ns3",
1132-
TxId: "tx1",
1131+
}).Return(&FetchedPvtDataContainer{
1132+
AvailableElemenets: []*proto.PvtDataElement{
1133+
{
1134+
Digest: &proto.PvtDataDigest{
1135+
BlockSeq: 1,
1136+
Collection: "c2",
1137+
Namespace: "ns3",
1138+
TxId: "tx1",
1139+
},
1140+
Payload: [][]byte{[]byte("wrong pre-image")},
11331141
},
1134-
Payload: [][]byte{[]byte("wrong pre-image")},
11351142
},
11361143
}, nil)
11371144

0 commit comments

Comments
 (0)