Skip to content

Commit f21ec45

Browse files
manish-sethimastersingh24
authored andcommitted
[FAB-10513] Support pvtdata store from v1.1
This CR adds support for a pvtdata store that has pvt data from both v1.1 and v1.2. See jira for more details Change-Id: Icecc8ee085a097c439e205736759785b17818348 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 297279e commit f21ec45

File tree

10 files changed

+513
-0
lines changed

10 files changed

+513
-0
lines changed

common/ledger/testutil/test_util.go

+53
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import (
2222
"crypto/rand"
2323
"encoding/json"
2424
"fmt"
25+
"io"
26+
"os"
27+
"path/filepath"
2528
"reflect"
2629
"runtime"
2730
"testing"
@@ -202,3 +205,53 @@ func CreateTarBytesForTest(testFiles []*TarFileEntry) []byte {
202205
tarWriter.Close()
203206
return buffer.Bytes()
204207
}
208+
209+
// CopyDir creates a copy of a dir
210+
func CopyDir(srcroot, destroot string) error {
211+
_, lastSegment := filepath.Split(srcroot)
212+
destroot = filepath.Join(destroot, lastSegment)
213+
214+
walkFunc := func(srcpath string, info os.FileInfo, err error) error {
215+
srcsubpath, err := filepath.Rel(srcroot, srcpath)
216+
if err != nil {
217+
return err
218+
}
219+
destpath := filepath.Join(destroot, srcsubpath)
220+
221+
if info.IsDir() { // its a dir, make corresponding dir in the dest
222+
if err = os.MkdirAll(destpath, info.Mode()); err != nil {
223+
return err
224+
}
225+
return nil
226+
}
227+
228+
// its a file, copy to corresponding path in the dest
229+
if err = copyFile(srcpath, destpath); err != nil {
230+
return err
231+
}
232+
return nil
233+
}
234+
235+
return filepath.Walk(srcroot, walkFunc)
236+
}
237+
238+
func copyFile(srcpath, destpath string) error {
239+
var srcFile, destFile *os.File
240+
var err error
241+
if srcFile, err = os.Open(srcpath); err != nil {
242+
return err
243+
}
244+
if destFile, err = os.Create(destpath); err != nil {
245+
return err
246+
}
247+
if _, err = io.Copy(destFile, srcFile); err != nil {
248+
return err
249+
}
250+
if err = srcFile.Close(); err != nil {
251+
return err
252+
}
253+
if err = destFile.Close(); err != nil {
254+
return err
255+
}
256+
return nil
257+
}

core/ledger/pvtdatastorage/store_impl.go

+5
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ func (p *provider) OpenStore(ledgerid string) (Store, error) {
7575
if err := s.initState(); err != nil {
7676
return nil, err
7777
}
78+
logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d], batchPending [%t]",
79+
s.isEmpty, s.lastCommittedBlock, s.batchPending)
7880
return s, nil
7981
}
8082

@@ -197,6 +199,9 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
197199

198200
for itr.Next() {
199201
dataKeyBytes := itr.Key()
202+
if v11Format(dataKeyBytes) {
203+
return v11RetrievePvtdata(itr, filter)
204+
}
200205
dataValueBytes := itr.Value()
201206
dataKey := decodeDatakey(dataKeyBytes)
202207
expired, err := isExpired(dataKey, s.btlPolicy, s.lastCommittedBlock)
Binary file not shown.
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
MANIFEST-000065

core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/LOCK

Whitespace-only changes.

core/ledger/pvtdatastorage/testdata/v11_v12/ledgersData/pvtdataStore/LOG

+291
Large diffs are not rendered by default.
Binary file not shown.

core/ledger/pvtdatastorage/v11.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package pvtdatastorage
8+
9+
import (
10+
"github.com/golang/protobuf/proto"
11+
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
12+
"github.com/hyperledger/fabric/core/ledger"
13+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
14+
"github.com/hyperledger/fabric/protos/ledger/rwset"
15+
)
16+
17+
func v11Format(datakeyBytes []byte) bool {
18+
_, n := version.NewHeightFromBytes(datakeyBytes[1:])
19+
remainingBytes := datakeyBytes[n+1:]
20+
return len(remainingBytes) == 0
21+
}
22+
23+
func v11DecodePK(key blkTranNumKey) (blockNum uint64, tranNum uint64) {
24+
height, _ := version.NewHeightFromBytes(key[1:])
25+
return height.BlockNum, height.TxNum
26+
}
27+
28+
func v11DecodePvtRwSet(encodedBytes []byte) (*rwset.TxPvtReadWriteSet, error) {
29+
writeset := &rwset.TxPvtReadWriteSet{}
30+
return writeset, proto.Unmarshal(encodedBytes, writeset)
31+
}
32+
33+
func v11RetrievePvtdata(itr *leveldbhelper.Iterator, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
34+
var blkPvtData []*ledger.TxPvtData
35+
txPvtData, err := v11DecodeKV(itr.Key(), itr.Value(), filter)
36+
if err != nil {
37+
return nil, err
38+
}
39+
blkPvtData = append(blkPvtData, txPvtData)
40+
for itr.Next() {
41+
pvtDatum, err := v11DecodeKV(itr.Key(), itr.Value(), filter)
42+
if err != nil {
43+
return nil, err
44+
}
45+
blkPvtData = append(blkPvtData, pvtDatum)
46+
}
47+
return blkPvtData, nil
48+
}
49+
50+
func v11DecodeKV(k, v []byte, filter ledger.PvtNsCollFilter) (*ledger.TxPvtData, error) {
51+
bNum, tNum := v11DecodePK(k)
52+
var pvtWSet *rwset.TxPvtReadWriteSet
53+
var err error
54+
if pvtWSet, err = v11DecodePvtRwSet(v); err != nil {
55+
return nil, err
56+
}
57+
logger.Debugf("Retrieved V11 private data write set for block [%d] tran [%d]", bNum, tNum)
58+
filteredWSet := v11TrimPvtWSet(pvtWSet, filter)
59+
return &ledger.TxPvtData{SeqInBlock: tNum, WriteSet: filteredWSet}, nil
60+
}
61+
62+
func v11TrimPvtWSet(pvtWSet *rwset.TxPvtReadWriteSet, filter ledger.PvtNsCollFilter) *rwset.TxPvtReadWriteSet {
63+
if filter == nil {
64+
return pvtWSet
65+
}
66+
67+
var filteredNsRwSet []*rwset.NsPvtReadWriteSet
68+
for _, ns := range pvtWSet.NsPvtRwset {
69+
var filteredCollRwSet []*rwset.CollectionPvtReadWriteSet
70+
for _, coll := range ns.CollectionPvtRwset {
71+
if filter.Has(ns.Namespace, coll.CollectionName) {
72+
filteredCollRwSet = append(filteredCollRwSet, coll)
73+
}
74+
}
75+
if filteredCollRwSet != nil {
76+
filteredNsRwSet = append(filteredNsRwSet,
77+
&rwset.NsPvtReadWriteSet{
78+
Namespace: ns.Namespace,
79+
CollectionPvtRwset: filteredCollRwSet,
80+
},
81+
)
82+
}
83+
}
84+
var filteredTxPvtRwSet *rwset.TxPvtReadWriteSet
85+
if filteredNsRwSet != nil {
86+
filteredTxPvtRwSet = &rwset.TxPvtReadWriteSet{
87+
DataModel: pvtWSet.GetDataModel(),
88+
NsPvtRwset: filteredNsRwSet,
89+
}
90+
}
91+
return filteredTxPvtRwSet
92+
}
+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package pvtdatastorage
8+
9+
import (
10+
"os"
11+
"testing"
12+
13+
"github.com/davecgh/go-spew/spew"
14+
"github.com/hyperledger/fabric/common/ledger/testutil"
15+
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
16+
btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil"
17+
"github.com/spf13/viper"
18+
"github.com/stretchr/testify/assert"
19+
)
20+
21+
// TestV11v12 test that we are able to read the mixed format data (for v11 and v12)
22+
// from pvtdata store. This test used a pvt data store that is produced in one of the
23+
// upgrade tests. The store contains total 15 blocks. Block number one to nine has not
24+
// pvt data because, that time peer code was v1.0 and hence no pvt data. Block 10 contains
25+
// a pvtdata from peer v1.1. Block 11 - 13 has not pvt data. Block 14 has pvt data from peer v1.2
26+
func TestV11v12(t *testing.T) {
27+
testWorkingDir := "test-working-dir"
28+
testutil.CopyDir("testdata/v11_v12/ledgersData", testWorkingDir)
29+
defer os.RemoveAll(testWorkingDir)
30+
31+
viper.Set("peer.fileSystemPath", testWorkingDir)
32+
defer viper.Reset()
33+
34+
ledgerid := "ch1"
35+
cs := btltestutil.NewMockCollectionStore()
36+
cs.SetBTL("marbles_private", "collectionMarbles", 0)
37+
cs.SetBTL("marbles_private", "collectionMarblePrivateDetails", 0)
38+
btlPolicy := pvtdatapolicy.ConstructBTLPolicy(cs)
39+
40+
p := NewProvider()
41+
defer p.Close()
42+
s, err := p.OpenStore(ledgerid)
43+
assert.NoError(t, err)
44+
s.Init(btlPolicy)
45+
46+
for blk := 0; blk < 10; blk++ {
47+
checkDataNotExists(t, s, blk)
48+
}
49+
checkDataExists(t, s, 10)
50+
for blk := 11; blk < 14; blk++ {
51+
checkDataNotExists(t, s, blk)
52+
}
53+
checkDataExists(t, s, 14)
54+
55+
_, err = s.GetPvtDataByBlockNum(uint64(15), nil)
56+
_, ok := err.(*ErrOutOfRange)
57+
assert.True(t, ok)
58+
}
59+
60+
func checkDataNotExists(t *testing.T, s Store, blkNum int) {
61+
data, err := s.GetPvtDataByBlockNum(uint64(blkNum), nil)
62+
assert.NoError(t, err)
63+
assert.Nil(t, data)
64+
}
65+
66+
func checkDataExists(t *testing.T, s Store, blkNum int) {
67+
data, err := s.GetPvtDataByBlockNum(uint64(blkNum), nil)
68+
assert.NoError(t, err)
69+
assert.NotNil(t, data)
70+
t.Logf("pvtdata = %s\n", spew.Sdump(data))
71+
}

0 commit comments

Comments
 (0)