Skip to content

Commit 11bba21

Browse files
manish-sethidenyeart
authored andcommitted
[FAB-10617] Add writeset validation check during commit
Fabric version 1.1, missed one check in the couchdb key-values during simulation. Couchdb does not allow an underscore as a first character in the key or first-level fields name in the JSON value. The implication of this is that during commit, when the write-set is applied to the couchdb, the peer panic happens. The problem is severe because this brings network at halt. This is because, upon peer startup, peer again tries to process the last block and hence gets into an endless cycle of panic and restart. This CR adds a check in the commit path for the validity of the write-set and marks the transaction as invalid. Change-Id: I36070bdba0a5012d6e99bc311e84be13863e7d8b Signed-off-by: manish <manish.sethi@gmail.com> (cherry picked from commit 5d8a35e)
1 parent 29a7d95 commit 11bba21

File tree

8 files changed

+176
-77
lines changed

8 files changed

+176
-77
lines changed

core/ledger/kvledger/txmgmt/statedb/statedb.go

+3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type VersionedDB interface {
4545
GetLatestSavePoint() (*version.Height, error)
4646
// ValidateKeyValue tests whether the key and value is supported by the db implementation.
4747
// For instance, leveldb supports any bytes for the key while the couchdb supports only valid utf-8 string
48+
// TODO make the function ValidateKeyValue return a specific error say ErrInvalidKeyValue
49+
// However, as of now, the both implementations of this function (leveldb and couchdb) are deterministic in returing an error
50+
// i.e., an error is returned only if the key-value are found to be invalid for the underlying db
4851
ValidateKeyValue(key string, value []byte) error
4952
// BytesKeySuppoted returns true if the implementation (underlying db) supports the any bytes to be used as key.
5053
// For instance, leveldb supports any bytes for the key while the couchdb supports only valid utf-8 string

core/ledger/kvledger/txmgmt/validator/valimpl/default_impl.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,14 @@ var logger = flogging.MustGetLogger("valimpl")
2424
// valinternal.InternalValidator) such as statebased validator
2525
type DefaultImpl struct {
2626
txmgr txmgr.TxMgr
27+
db privacyenabledstate.DB
2728
valinternal.InternalValidator
2829
}
2930

3031
// NewStatebasedValidator constructs a validator that internally manages statebased validator and in addition
3132
// handles the tasks that are agnostic to a particular validation scheme such as parsing the block and handling the pvt data
3233
func NewStatebasedValidator(txmgr txmgr.TxMgr, db privacyenabledstate.DB) validator.Validator {
33-
return &DefaultImpl{txmgr, statebasedval.NewValidator(db)}
34+
return &DefaultImpl{txmgr, db, statebasedval.NewValidator(db)}
3435
}
3536

3637
// ValidateAndPrepareBatch implements the function in interface validator.Validator
@@ -44,7 +45,7 @@ func (impl *DefaultImpl) ValidateAndPrepareBatch(blockAndPvtdata *ledger.BlockAn
4445
var err error
4546

4647
logger.Debug("preprocessing ProtoBlock...")
47-
if internalBlock, err = preprocessProtoBlock(impl.txmgr, block, doMVCCValidation); err != nil {
48+
if internalBlock, err = preprocessProtoBlock(impl.txmgr, impl.db.ValidateKeyValue, block, doMVCCValidation); err != nil {
4849
return nil, err
4950
}
5051

core/ledger/kvledger/txmgmt/validator/valimpl/helper.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ func validatePvtdata(tx *valinternal.Transaction, pvtdata *ledger.TxPvtData) err
8888

8989
// preprocessProtoBlock parses the proto instance of block into 'Block' structure.
9090
// The retuned 'Block' structure contains only transactions that are endorser transactions and are not alredy marked as invalid
91-
func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block, doMVCCValidation bool) (*valinternal.Block, error) {
91+
func preprocessProtoBlock(txmgr txmgr.TxMgr, validateKVFunc func(key string, value []byte) error,
92+
block *common.Block, doMVCCValidation bool) (*valinternal.Block, error) {
9293
b := &valinternal.Block{Num: block.Header.Number}
9394
// Committer validator has already set validation flags based on well formed tran checks
9495
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
@@ -145,6 +146,14 @@ func preprocessProtoBlock(txmgr txmgr.TxMgr, block *common.Block, doMVCCValidati
145146
}
146147
}
147148
if txRWSet != nil {
149+
if err := validateWriteset(txRWSet, validateKVFunc); err != nil {
150+
logger.Warningf("Channel [%s]: Block [%d] Transaction index [%d] TxId [%s]"+
151+
" marked as invalid. Reason code [%s]. Message: [%s]",
152+
chdr.GetChannelId(), block.Header.Number, txIndex, chdr.GetTxId(),
153+
peer.TxValidationCode_INVALID_WRITESET, err.Error())
154+
txsFilter.SetFlag(txIndex, peer.TxValidationCode_INVALID_WRITESET)
155+
continue
156+
}
148157
b.Txs = append(b.Txs, &valinternal.Transaction{IndexInBlock: txIndex, ID: chdr.TxId, RWSet: txRWSet})
149158
}
150159
}
@@ -175,6 +184,24 @@ func processNonEndorserTx(txEnv *common.Envelope, txid string, txType common.Hea
175184
return simRes.PubSimulationResults, nil
176185
}
177186

187+
func validateWriteset(txRWSet *rwsetutil.TxRwSet, validateKVFunc func(key string, value []byte) error) error {
188+
for _, nsRwSet := range txRWSet.NsRwSets {
189+
if nsRwSet == nil {
190+
continue
191+
}
192+
pubWriteset := nsRwSet.KvRwSet
193+
if pubWriteset == nil {
194+
continue
195+
}
196+
for _, kvwrite := range pubWriteset.Writes {
197+
if err := validateKVFunc(kvwrite.Key, kvwrite.Value); err != nil {
198+
return err
199+
}
200+
}
201+
}
202+
return nil
203+
}
204+
178205
// postprocessProtoBlock updates the proto block's validation flags (in metadata) by the results of validation process
179206
func postprocessProtoBlock(block *common.Block, validatedBlock *valinternal.Block) {
180207
txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])

core/ledger/kvledger/txmgmt/validator/valimpl/helper_test.go

+50-9
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
1818
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valinternal"
1919
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
20+
lgrutil "github.com/hyperledger/fabric/core/ledger/util"
2021
lutils "github.com/hyperledger/fabric/core/ledger/util"
2122
"github.com/hyperledger/fabric/protos/common"
2223
"github.com/hyperledger/fabric/protos/peer"
@@ -117,8 +118,10 @@ func TestValidateAndPreparePvtBatch(t *testing.T) {
117118
tx3TxRWSet, err := rwsetutil.TxRwSetFromProtoMsg(tx3SimulationResults.PubSimulationResults)
118119
assert.NoError(t, err)
119120
expectedPerProcessedBlock.Txs = append(expectedPerProcessedBlock.Txs, &valinternal.Transaction{IndexInBlock: 2, ID: "tx3", RWSet: tx3TxRWSet})
120-
121-
actualPreProcessedBlock, err := preprocessProtoBlock(nil, block, false)
121+
alwaysValidKVFunc := func(key string, value []byte) error {
122+
return nil
123+
}
124+
actualPreProcessedBlock, err := preprocessProtoBlock(nil, alwaysValidKVFunc, block, false)
122125
assert.NoError(t, err)
123126
assert.Equal(t, expectedPerProcessedBlock, actualPreProcessedBlock)
124127

@@ -149,25 +152,27 @@ func TestValidateAndPreparePvtBatch(t *testing.T) {
149152
}
150153

151154
func TestPreprocessProtoBlock(t *testing.T) {
152-
155+
allwaysValidKVfunc := func(key string, value []byte) error {
156+
return nil
157+
}
153158
// good block
154159
//_, gb := testutil.NewBlockGenerator(t, "testLedger", false)
155160
gb := testutil.ConstructTestBlock(t, 10, 1, 1)
156-
_, err := preprocessProtoBlock(nil, gb, false)
161+
_, err := preprocessProtoBlock(nil, allwaysValidKVfunc, gb, false)
157162
assert.NoError(t, err)
158163
// bad envelope
159164
gb = testutil.ConstructTestBlock(t, 11, 1, 1)
160165
gb.Data = &common.BlockData{Data: [][]byte{{123}}}
161166
gb.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] =
162167
lutils.NewTxValidationFlagsSetValue(len(gb.Data.Data), peer.TxValidationCode_VALID)
163-
_, err = preprocessProtoBlock(nil, gb, false)
168+
_, err = preprocessProtoBlock(nil, allwaysValidKVfunc, gb, false)
164169
assert.Error(t, err)
165170
t.Log(err)
166171
// bad payload
167172
gb = testutil.ConstructTestBlock(t, 12, 1, 1)
168173
envBytes, _ := putils.GetBytesEnvelope(&common.Envelope{Payload: []byte{123}})
169174
gb.Data = &common.BlockData{Data: [][]byte{envBytes}}
170-
_, err = preprocessProtoBlock(nil, gb, false)
175+
_, err = preprocessProtoBlock(nil, allwaysValidKVfunc, gb, false)
171176
assert.Error(t, err)
172177
t.Log(err)
173178
// bad channel header
@@ -177,7 +182,7 @@ func TestPreprocessProtoBlock(t *testing.T) {
177182
})
178183
envBytes, _ = putils.GetBytesEnvelope(&common.Envelope{Payload: payloadBytes})
179184
gb.Data = &common.BlockData{Data: [][]byte{envBytes}}
180-
_, err = preprocessProtoBlock(nil, gb, false)
185+
_, err = preprocessProtoBlock(nil, allwaysValidKVfunc, gb, false)
181186
assert.Error(t, err)
182187
t.Log(err)
183188

@@ -191,7 +196,7 @@ func TestPreprocessProtoBlock(t *testing.T) {
191196
flags := lutils.NewTxValidationFlags(len(gb.Data.Data))
192197
flags.SetFlag(0, peer.TxValidationCode_BAD_CHANNEL_HEADER)
193198
gb.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = flags
194-
_, err = preprocessProtoBlock(nil, gb, false)
199+
_, err = preprocessProtoBlock(nil, allwaysValidKVfunc, gb, false)
195200
assert.NoError(t, err) // invalid filter should take precendence
196201

197202
// new block
@@ -205,7 +210,7 @@ func TestPreprocessProtoBlock(t *testing.T) {
205210
// set logging backend for test
206211
backend := logging.NewMemoryBackend(1)
207212
logging.SetBackend(backend)
208-
_, err = preprocessProtoBlock(nil, gb, false)
213+
_, err = preprocessProtoBlock(nil, allwaysValidKVfunc, gb, false)
209214
assert.NoError(t, err)
210215
expected := fmt.Sprintf("Channel [%s]: Block [%d] Transaction index [%d] TxId [%s]"+
211216
" marked as invalid by committer. Reason code [%s]",
@@ -217,6 +222,42 @@ func TestPreprocessProtoBlock(t *testing.T) {
217222

218223
}
219224

225+
func TestPreprocessProtoBlockInvalidWriteset(t *testing.T) {
226+
kvValidationFunc := func(key string, value []byte) error {
227+
if value[0] == '_' {
228+
return fmt.Errorf("value [%s] found to be invalid by 'kvValidationFunc for testing'", value)
229+
}
230+
return nil
231+
}
232+
233+
rwSetBuilder := rwsetutil.NewRWSetBuilder()
234+
rwSetBuilder.AddToWriteSet("ns", "key", []byte("_invalidValue")) // bad value
235+
simulation1, err := rwSetBuilder.GetTxSimulationResults()
236+
assert.NoError(t, err)
237+
simulation1Bytes, err := simulation1.GetPubSimulationBytes()
238+
assert.NoError(t, err)
239+
240+
rwSetBuilder = rwsetutil.NewRWSetBuilder()
241+
rwSetBuilder.AddToWriteSet("ns", "key", []byte("validValue")) // good value
242+
simulation2, err := rwSetBuilder.GetTxSimulationResults()
243+
assert.NoError(t, err)
244+
simulation2Bytes, err := simulation2.GetPubSimulationBytes()
245+
assert.NoError(t, err)
246+
247+
block := testutil.ConstructBlock(t, 1, testutil.ConstructRandomBytes(t, 32),
248+
[][]byte{simulation1Bytes, simulation2Bytes}, false) // block with two txs
249+
txfilter := lgrutil.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
250+
assert.True(t, txfilter.IsValid(0))
251+
assert.True(t, txfilter.IsValid(1)) // both txs are valid initially at the time of block cutting
252+
253+
internalBlock, err := preprocessProtoBlock(nil, kvValidationFunc, block, false)
254+
assert.NoError(t, err)
255+
assert.False(t, txfilter.IsValid(0)) // tx at index 0 should be marked as invalid
256+
assert.True(t, txfilter.IsValid(1)) // tx at index 1 should be marked as valid
257+
assert.Len(t, internalBlock.Txs, 1)
258+
assert.Equal(t, internalBlock.Txs[0].IndexInBlock, 1)
259+
}
260+
220261
// from go-logging memory_test.go
221262
func memoryRecordN(b *logging.MemoryBackend, n int) *logging.Record {
222263
node := b.Head()

core/ledger/ledgerstorage/store.go

+31-8
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,18 @@ import (
2020
"fmt"
2121
"sync"
2222

23-
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
24-
23+
"github.com/hyperledger/fabric/common/flogging"
2524
"github.com/hyperledger/fabric/common/ledger/blkstorage"
2625
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
2726
"github.com/hyperledger/fabric/core/ledger"
2827
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
28+
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
2929
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
3030
"github.com/hyperledger/fabric/protos/common"
3131
)
3232

33+
var logger = flogging.MustGetLogger("ledgerstorage")
34+
3335
// Provider encapusaltes two providers 1) block store provider and 2) and pvt data store provider
3436
type Provider struct {
3537
blkStoreProvider blkstorage.BlockStoreProvider
@@ -95,20 +97,41 @@ func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
9597

9698
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
9799
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
100+
blockNum := blockAndPvtdata.Block.Header.Number
98101
s.rwlock.Lock()
99102
defer s.rwlock.Unlock()
100-
var pvtdata []*ledger.TxPvtData
101-
for _, v := range blockAndPvtdata.BlockPvtData {
102-
pvtdata = append(pvtdata, v)
103-
}
104-
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata); err != nil {
103+
104+
pvtBlkStoreHt, err := s.pvtdataStore.LastCommittedBlockHeight()
105+
if err != nil {
105106
return err
106107
}
108+
109+
writtenToPvtStore := false
110+
if pvtBlkStoreHt < blockNum+1 { // The pvt data store sanity check does not allow rewriting the pvt data.
111+
// when re-processing blocks (rejoin the channel or re-fetching last few block),
112+
// skip the pvt data commit to the pvtdata blockstore
113+
logger.Debugf("Writing block [%d] to pvt block store", blockNum)
114+
var pvtdata []*ledger.TxPvtData
115+
for _, v := range blockAndPvtdata.BlockPvtData {
116+
pvtdata = append(pvtdata, v)
117+
}
118+
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata); err != nil {
119+
return err
120+
}
121+
writtenToPvtStore = true
122+
} else {
123+
logger.Debugf("Skipping writing block [%d] to pvt block store as the store height is [%d]", blockNum, pvtBlkStoreHt)
124+
}
125+
107126
if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
108127
s.pvtdataStore.Rollback()
109128
return err
110129
}
111-
return s.pvtdataStore.Commit()
130+
131+
if writtenToPvtStore {
132+
return s.pvtdataStore.Commit()
133+
}
134+
return nil
112135
}
113136

114137
// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.

core/ledger/ledgerstorage/store_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,9 @@ func TestAddAfterPvtdataStoreError(t *testing.T) {
241241
for _, d := range sampleData[0:9] {
242242
assert.NoError(t, store.CommitWithPvtData(d))
243243
}
244-
// try to write the last block again. The function should pass on the error raised by the private store
245-
err = store.CommitWithPvtData(sampleData[8])
246-
_, ok := err.(*pvtdatastorage.ErrIllegalArgs)
247-
assert.True(t, ok)
244+
// try to write the last block again. The function should skip adding block to the private store
245+
// as the pvt store but the block storage should return error
246+
assert.Error(t, store.CommitWithPvtData(sampleData[8]))
248247

249248
// At the end, the pvt store status should not have changed
250249
pvtStoreCommitHt, err := store.pvtdataStore.LastCommittedBlockHeight()

0 commit comments

Comments
 (0)