Skip to content

Commit 3d05ff3

Browse files
committed
[FAB-6619] purge pvtdata from pvt block store
This CR includes - Make the pvtdata key based on collection so that purge does not have to perform a read and can simply issue delete for the keys - Store the expiry schedule based on the expirying block for collections - Purge the expired pvt data periodically (default every 500 block) Change-Id: I8700de8a3d24436ac88472b4f46e8f994475e6c6 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 7823ea7 commit 3d05ff3

12 files changed

+734
-159
lines changed

core/ledger/ledgerconfig/ledger_config.go

+10
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,16 @@ func GetMaxBatchUpdateSize() int {
108108
return maxBatchUpdateSize
109109
}
110110

111+
// GetPvtdataStorePurgeInterval returns the interval in the terms of number of blocks
112+
// when the purge for the expired data would be performed
113+
func GetPvtdataStorePurgeInterval() uint64 {
114+
purgeInterval := viper.GetInt("ledger.pvtdataStore.purgeInterval")
115+
if purgeInterval <= 0 {
116+
purgeInterval = 100
117+
}
118+
return uint64(purgeInterval)
119+
}
120+
111121
//IsHistoryDBEnabled exposes the historyDatabase variable
112122
func IsHistoryDBEnabled() bool {
113123
return viper.GetBool(confEnableHistoryDatabase)

core/ledger/pvtdatastorage/expiry_data.pb.go

+107
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
syntax = "proto3";
8+
9+
option go_package = "github.com/hyperledger/fabric/core/ledger/pvtdatastorage";
10+
11+
package pvtdatastorage;
12+
13+
message ExpiryData {
14+
map<string, Collections> map = 1;
15+
}
16+
17+
message Collections {
18+
map<string, TxNums> map = 1;
19+
}
20+
21+
message TxNums {
22+
repeated uint64 list = 1;
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package pvtdatastorage
8+
9+
func newExpiryData() *ExpiryData {
10+
return &ExpiryData{make(map[string]*Collections)}
11+
}
12+
13+
func newCollections() *Collections {
14+
return &Collections{make(map[string]*TxNums)}
15+
}
16+
17+
func (e *ExpiryData) add(ns, coll string, txNum uint64) {
18+
collections, ok := e.Map[ns]
19+
if !ok {
20+
collections = newCollections()
21+
e.Map[ns] = collections
22+
}
23+
txNums, ok := collections.Map[coll]
24+
if !ok {
25+
txNums = &TxNums{}
26+
collections.Map[coll] = txNums
27+
}
28+
txNums.List = append(txNums.List, txNum)
29+
}

core/ledger/pvtdatastorage/helper.go

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package pvtdatastorage
8+
9+
import (
10+
"math"
11+
12+
"github.com/hyperledger/fabric/core/ledger"
13+
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
14+
"github.com/hyperledger/fabric/protos/ledger/rwset"
15+
)
16+
17+
func prepareStoreEntries(blockNum uint64, pvtdata []*ledger.TxPvtData, btlPolicy pvtdatapolicy.BTLPolicy) ([]*dataEntry, []*expiryEntry) {
18+
dataEntries := prepareDataEntries(blockNum, pvtdata)
19+
expiryEntries := prepareExpiryEntries(blockNum, dataEntries, btlPolicy)
20+
return dataEntries, expiryEntries
21+
}
22+
23+
func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEntry {
24+
var dataEntries []*dataEntry
25+
for _, txPvtdata := range pvtData {
26+
for _, nsPvtdata := range txPvtdata.WriteSet.NsPvtRwset {
27+
for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
28+
txnum := txPvtdata.SeqInBlock
29+
ns := nsPvtdata.Namespace
30+
coll := collPvtdata.CollectionName
31+
dataKey := &dataKey{blockNum, txnum, ns, coll}
32+
dataEntries = append(dataEntries, &dataEntry{key: dataKey, value: collPvtdata})
33+
}
34+
}
35+
}
36+
return dataEntries
37+
}
38+
39+
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, btlPolicy pvtdatapolicy.BTLPolicy) []*expiryEntry {
40+
mapByExpiringBlk := make(map[uint64]*ExpiryData)
41+
for _, dataEntry := range dataEntries {
42+
expiringBlk := btlPolicy.GetExpiringBlock(dataEntry.key.ns, dataEntry.key.coll, dataEntry.key.blkNum)
43+
if neverExpires(expiringBlk) {
44+
continue
45+
}
46+
expiryData, ok := mapByExpiringBlk[expiringBlk]
47+
if !ok {
48+
expiryData = newExpiryData()
49+
mapByExpiringBlk[expiringBlk] = expiryData
50+
}
51+
expiryData.add(dataEntry.key.ns, dataEntry.key.coll, dataEntry.key.txNum)
52+
}
53+
var expiryEntries []*expiryEntry
54+
for expiryBlk, expiryData := range mapByExpiringBlk {
55+
expiryKey := &expiryKey{expiringBlk: expiryBlk, committingBlk: committingBlk}
56+
expiryEntries = append(expiryEntries, &expiryEntry{key: expiryKey, value: expiryData})
57+
}
58+
return expiryEntries
59+
}
60+
61+
func deriveDataKeys(expiryEntry *expiryEntry) []*dataKey {
62+
var dataKeys []*dataKey
63+
for ns, colls := range expiryEntry.value.Map {
64+
for coll, txNums := range colls.Map {
65+
for _, txNum := range txNums.List {
66+
dataKeys = append(dataKeys, &dataKey{expiryEntry.key.committingBlk, txNum, ns, coll})
67+
}
68+
}
69+
}
70+
return dataKeys
71+
}
72+
73+
func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool {
74+
return filter == nil || filter.Has(dataKey.ns, dataKey.coll)
75+
}
76+
77+
func isExpired(dataKey *dataKey, btl pvtdatapolicy.BTLPolicy, latestBlkNum uint64) bool {
78+
return latestBlkNum >= btl.GetExpiringBlock(dataKey.ns, dataKey.coll, dataKey.blkNum)
79+
}
80+
81+
func neverExpires(expiringBlkNum uint64) bool {
82+
return expiringBlkNum == math.MaxUint64
83+
}
84+
85+
type txPvtdataAssembler struct {
86+
blockNum, txNum uint64
87+
txWset *rwset.TxPvtReadWriteSet
88+
currentNsWSet *rwset.NsPvtReadWriteSet
89+
firstCall bool
90+
}
91+
92+
func newTxPvtdataAssembler(blockNum, txNum uint64) *txPvtdataAssembler {
93+
return &txPvtdataAssembler{blockNum, txNum, &rwset.TxPvtReadWriteSet{}, nil, true}
94+
}
95+
96+
func (a *txPvtdataAssembler) add(ns string, collPvtWset *rwset.CollectionPvtReadWriteSet) {
97+
// start a NsWset
98+
if a.firstCall {
99+
a.currentNsWSet = &rwset.NsPvtReadWriteSet{Namespace: ns}
100+
a.firstCall = false
101+
}
102+
103+
// if a new ns started, add the existing NsWset to TxWset and start a new one
104+
if a.currentNsWSet.Namespace != ns {
105+
a.txWset.NsPvtRwset = append(a.txWset.NsPvtRwset, a.currentNsWSet)
106+
a.currentNsWSet = &rwset.NsPvtReadWriteSet{Namespace: ns}
107+
}
108+
// add the collWset to the current NsWset
109+
a.currentNsWSet.CollectionPvtRwset = append(a.currentNsWSet.CollectionPvtRwset, collPvtWset)
110+
}
111+
112+
func (a *txPvtdataAssembler) done() {
113+
if a.currentNsWSet != nil {
114+
a.txWset.NsPvtRwset = append(a.txWset.NsPvtRwset, a.currentNsWSet)
115+
}
116+
a.currentNsWSet = nil
117+
}
118+
119+
func (a *txPvtdataAssembler) getTxPvtdata() *ledger.TxPvtData {
120+
a.done()
121+
return &ledger.TxPvtData{SeqInBlock: a.txNum, WriteSet: a.txWset}
122+
}

0 commit comments

Comments
 (0)