Skip to content

Commit 7de873c

Browse files
committedJan 18, 2018
[FAB-7676] Ignore read-only collections preimage pull
Currently, when the coordinator parses the block, it doesn't distinguish between collections that have no private writes and collections that do. This results in gossip attempting to retrieve from the transient store, or from other peers - preimages for read-only collection RWSets, which don't exist anywhere because they're not persisted in the transient store, and as a result - the attempt times out. This change set makes the logic skip collections that are read-only. Change-Id: If41d2e0ebe093d567c480d17d9b7e26263659498 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent a85b2fa commit 7de873c

File tree

3 files changed

+124
-17
lines changed

3 files changed

+124
-17
lines changed
 

‎gossip/privdata/coordinator.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -665,20 +665,23 @@ type transactionInspector struct {
665665

666666
func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) {
667667
for _, ns := range txRWSet.NsRwSets {
668-
for _, hashed := range ns.CollHashedRwSets {
669-
policy := bi.accessPolicyForCollection(chdr, ns.NameSpace, hashed.CollectionName)
668+
for _, hashedCollection := range ns.CollHashedRwSets {
669+
if !containsWrites(chdr.TxId, ns.NameSpace, hashedCollection) {
670+
continue
671+
}
672+
policy := bi.accessPolicyForCollection(chdr, ns.NameSpace, hashedCollection.CollectionName)
670673
if policy == nil {
671674
continue
672675
}
673-
if !bi.isEligible(policy, ns.NameSpace, hashed.CollectionName) {
676+
if !bi.isEligible(policy, ns.NameSpace, hashedCollection.CollectionName) {
674677
continue
675678
}
676679
key := rwSetKey{
677680
txID: chdr.TxId,
678681
seqInBlock: seqInBlock,
679-
hash: hex.EncodeToString(hashed.PvtRwSetHash),
682+
hash: hex.EncodeToString(hashedCollection.PvtRwSetHash),
680683
namespace: ns.NameSpace,
681-
collection: hashed.CollectionName,
684+
collection: hashedCollection.CollectionName,
682685
}
683686
bi.privateRWsetsInBlock[key] = struct{}{}
684687
if _, exists := bi.ownedRWsets[key]; !exists {
@@ -687,7 +690,7 @@ func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *comm
687690
seqInBlock: seqInBlock,
688691
}
689692
bi.missingKeys[txAndSeq] = append(bi.missingKeys[txAndSeq], key)
690-
bi.sources[key] = endorsersFromOrgs(ns.NameSpace, hashed.CollectionName, endorsers, policy.MemberOrgs())
693+
bi.sources[key] = endorsersFromOrgs(ns.NameSpace, hashedCollection.CollectionName, endorsers, policy.MemberOrgs())
691694
}
692695
} // for all hashed RW sets
693696
} // for all RW sets
@@ -814,3 +817,16 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common
814817

815818
return blockAndPvtData.Block, seqs2Namespaces.asPrivateData(), nil
816819
}
820+
821+
// containsWrites checks whether the given CollHashedRwSet contains writes
822+
func containsWrites(txID string, namespace string, colHashedRWSet *rwsetutil.CollHashedRwSet) bool {
823+
if colHashedRWSet.HashedRwSet == nil {
824+
logger.Warningf("HashedRWSet of tx %s, namespace %s, collection %s is nil", txID, namespace, colHashedRWSet.CollectionName)
825+
return false
826+
}
827+
if len(colHashedRWSet.HashedRwSet.HashedWrites) == 0 {
828+
logger.Debugf("HashedRWSet of tx %s, namespace %s, collection %s doesn't contain writes", txID, namespace, colHashedRWSet.CollectionName)
829+
return false
830+
}
831+
return true
832+
}

‎gossip/privdata/coordinator_test.go

+73-2
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ import (
1919
util2 "github.com/hyperledger/fabric/common/util"
2020
"github.com/hyperledger/fabric/core/common/privdata"
2121
"github.com/hyperledger/fabric/core/ledger"
22+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
2223
"github.com/hyperledger/fabric/core/transientstore"
2324
"github.com/hyperledger/fabric/gossip/util"
2425
"github.com/hyperledger/fabric/protos/common"
2526
proto "github.com/hyperledger/fabric/protos/gossip"
2627
"github.com/hyperledger/fabric/protos/ledger/rwset"
28+
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
2729
"github.com/hyperledger/fabric/protos/msp"
2830
"github.com/spf13/viper"
2931
"github.com/stretchr/testify/assert"
@@ -748,7 +750,7 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
748750
channelID: "test",
749751
}
750752

751-
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", "c1").create()
753+
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1").create()
752754
store.On("GetTxPvtRWSetByTxid", "tx1", mock.Anything).Return((&mockRWSetScanner{}).withRWSet("ns1", "c1"), nil)
753755

754756
coordinator := NewCoordinator(Support{
@@ -837,7 +839,8 @@ func TestCoordinatorStoreBlock(t *testing.T) {
837839
bf := &blockFactory{
838840
channelID: "test",
839841
}
840-
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", "c1", "c2").AddTxnWithEndorsement("tx2", "ns2", hash, "org2", "c1").create()
842+
block := bf.AddTxnWithEndorsement("tx1", "ns1", hash, "org1", true, "c1", "c2").
843+
AddTxnWithEndorsement("tx2", "ns2", hash, "org2", true, "c1").create()
841844

842845
// Scenario I: Block we got has sufficient private data alongside it.
843846
// If the coordinator tries fetching from the transientstore, or peers it would result in panic,
@@ -1223,3 +1226,71 @@ func TestCoordinatorStorePvtData(t *testing.T) {
12231226
assert.Error(t, err)
12241227
assert.Contains(t, err.Error(), "I/O error: file system full")
12251228
}
1229+
1230+
func TestContainsWrites(t *testing.T) {
1231+
// Scenario I: Nil HashedRwSet in collection
1232+
col := &rwsetutil.CollHashedRwSet{
1233+
CollectionName: "col1",
1234+
}
1235+
assert.False(t, containsWrites("tx", "ns", col))
1236+
1237+
// Scenario II: No writes in collection
1238+
col.HashedRwSet = &kvrwset.HashedRWSet{}
1239+
assert.False(t, containsWrites("tx", "ns", col))
1240+
1241+
// Scenario III: Some writes in collection
1242+
col.HashedRwSet.HashedWrites = append(col.HashedRwSet.HashedWrites, &kvrwset.KVWriteHash{})
1243+
assert.True(t, containsWrites("tx", "ns", col))
1244+
}
1245+
1246+
func TestIgnoreReadOnlyColRWSets(t *testing.T) {
1247+
// Scenario: The transaction has some ColRWSets that have only reads and no writes,
1248+
// These should be ignored and not considered as missing private data that needs to be retrieved
1249+
// from the transient store or other peers.
1250+
// The gossip and transient store mocks in this test aren't initialized with
1251+
// actions, so if the coordinator attempts to fetch private data from the
1252+
// transient store or other peers, the test would fail.
1253+
// Also - we check that at commit time - the coordinator concluded that
1254+
// no missing private data was found.
1255+
peerSelfSignedData := common.SignedData{
1256+
Identity: []byte{0, 1, 2},
1257+
Signature: []byte{3, 4, 5},
1258+
Data: []byte{6, 7, 8},
1259+
}
1260+
cs := createcollectionStore(peerSelfSignedData).thatAcceptsAll()
1261+
var commitHappened bool
1262+
assertCommitHappened := func() {
1263+
assert.True(t, commitHappened)
1264+
commitHappened = false
1265+
}
1266+
committer := &committerMock{}
1267+
committer.On("CommitWithPvtData", mock.Anything).Run(func(args mock.Arguments) {
1268+
blockAndPrivateData := args.Get(0).(*ledger.BlockAndPvtData)
1269+
// Ensure there is no private data to commit
1270+
assert.Empty(t, blockAndPrivateData.BlockPvtData)
1271+
// Ensure there is no missing private data
1272+
assert.Empty(t, blockAndPrivateData.Missing)
1273+
commitHappened = true
1274+
}).Return(nil)
1275+
store := &mockTransientStore{t: t}
1276+
1277+
fetcher := &fetcherMock{t: t}
1278+
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
1279+
bf := &blockFactory{
1280+
channelID: "test",
1281+
}
1282+
// The block contains a read only private data transaction
1283+
block := bf.AddReadOnlyTxn("tx1", "ns3", hash, "c3", "c2").create()
1284+
coordinator := NewCoordinator(Support{
1285+
CollectionStore: cs,
1286+
Committer: committer,
1287+
Fetcher: fetcher,
1288+
TransientStore: store,
1289+
Validator: &validatorMock{},
1290+
}, peerSelfSignedData)
1291+
// We pass a nil private data slice to indicate no pre-images though the block contains
1292+
// private data reads.
1293+
err := coordinator.StoreBlock(block, nil)
1294+
assert.NoError(t, err)
1295+
assertCommitHappened()
1296+
}

‎gossip/privdata/util.go

+29-9
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,25 @@ type blockFactory struct {
3131
}
3232

3333
func (bf *blockFactory) AddTxn(txID string, nsName string, hash []byte, collections ...string) *blockFactory {
34-
return bf.AddTxnWithEndorsement(txID, nsName, hash, "", collections...)
34+
return bf.AddTxnWithEndorsement(txID, nsName, hash, "", true, collections...)
3535
}
3636

37-
func (bf *blockFactory) AddTxnWithEndorsement(txID string, nsName string, hash []byte, org string, collections ...string) *blockFactory {
37+
func (bf *blockFactory) AddReadOnlyTxn(txID string, nsName string, hash []byte, collections ...string) *blockFactory {
38+
return bf.AddTxnWithEndorsement(txID, nsName, hash, "", false, collections...)
39+
}
40+
41+
func (bf *blockFactory) AddTxnWithEndorsement(txID string, nsName string, hash []byte, org string, hasWrites bool, collections ...string) *blockFactory {
3842
txn := &peer.Transaction{
3943
Actions: []*peer.TransactionAction{
4044
{},
4145
},
4246
}
47+
nsRWSet := sampleNsRwSet(nsName, hash, collections...)
48+
if !hasWrites {
49+
nsRWSet = sampleReadOnlyNsRwSet(nsName, hash, collections...)
50+
}
4351
txrws := rwsetutil.TxRwSet{
44-
NsRwSets: []*rwsetutil.NsRwSet{sampleNsRwSet(nsName, hash, collections...)},
52+
NsRwSets: []*rwsetutil.NsRwSet{nsRWSet},
4553
}
4654

4755
b, err := txrws.ToProtoBytes()
@@ -167,7 +175,17 @@ func sampleNsRwSet(ns string, hash []byte, collections ...string) *rwsetutil.NsR
167175
KvRwSet: sampleKvRwSet(),
168176
}
169177
for _, col := range collections {
170-
nsRwSet.CollHashedRwSets = append(nsRwSet.CollHashedRwSets, sampleCollHashedRwSet(col, hash))
178+
nsRwSet.CollHashedRwSets = append(nsRwSet.CollHashedRwSets, sampleCollHashedRwSet(col, hash, true))
179+
}
180+
return nsRwSet
181+
}
182+
183+
func sampleReadOnlyNsRwSet(ns string, hash []byte, collections ...string) *rwsetutil.NsRwSet {
184+
nsRwSet := &rwsetutil.NsRwSet{NameSpace: ns,
185+
KvRwSet: sampleKvRwSet(),
186+
}
187+
for _, col := range collections {
188+
nsRwSet.CollHashedRwSets = append(nsRwSet.CollHashedRwSets, sampleCollHashedRwSet(col, hash, false))
171189
}
172190
return nsRwSet
173191
}
@@ -188,21 +206,23 @@ func sampleKvRwSet() *kvrwset.KVRWSet {
188206
}
189207
}
190208

191-
func sampleCollHashedRwSet(collectionName string, hash []byte) *rwsetutil.CollHashedRwSet {
209+
func sampleCollHashedRwSet(collectionName string, hash []byte, hasWrites bool) *rwsetutil.CollHashedRwSet {
192210
collHashedRwSet := &rwsetutil.CollHashedRwSet{
193211
CollectionName: collectionName,
194212
HashedRwSet: &kvrwset.HashedRWSet{
195213
HashedReads: []*kvrwset.KVReadHash{
196214
{KeyHash: []byte("Key-1-hash"), Version: &kvrwset.Version{BlockNum: 1, TxNum: 2}},
197215
{KeyHash: []byte("Key-2-hash"), Version: &kvrwset.Version{BlockNum: 2, TxNum: 3}},
198216
},
199-
HashedWrites: []*kvrwset.KVWriteHash{
200-
{KeyHash: []byte("Key-3-hash"), ValueHash: []byte("value-3-hash"), IsDelete: false},
201-
{KeyHash: []byte("Key-4-hash"), ValueHash: []byte("value-4-hash"), IsDelete: true},
202-
},
203217
},
204218
PvtRwSetHash: hash,
205219
}
220+
if hasWrites {
221+
collHashedRwSet.HashedRwSet.HashedWrites = []*kvrwset.KVWriteHash{
222+
{KeyHash: []byte("Key-3-hash"), ValueHash: []byte("value-3-hash"), IsDelete: false},
223+
{KeyHash: []byte("Key-4-hash"), ValueHash: []byte("value-4-hash"), IsDelete: true},
224+
}
225+
}
206226
return collHashedRwSet
207227
}
208228

0 commit comments

Comments
 (0)
Please sign in to comment.