Skip to content

Commit 6a00db3

Browse files
committed
[FAB-6555] Purge manager for pvt statedb data
This CR introduces a purge manager that manages the schedule of expiry of pvt data in the expiry keeper and actual purging of the data from pvt statedb Change-Id: I5f663a36e65ca0ab91dbfeeb664549d7f25bfbb3 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 0ca1af5 commit 6a00db3

File tree

7 files changed

+556
-3
lines changed

7 files changed

+556
-3
lines changed

core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go

-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func (s *CommonStorageDB) LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*sta
8484
if !ok {
8585
return nil
8686
}
87-
8887
// Here, hashedKeys are merged into pubKeys to get a combined set of keys for combined loading
8988
for _, key := range hashedKeys {
9089
ns := deriveHashedDataNs(key.Namespace, key.CollectionName)

core/ledger/kvledger/txmgmt/privacyenabledstate/db.go

+39
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
77
package privacyenabledstate
88

99
import (
10+
"fmt"
11+
1012
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
1113
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
1214
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
@@ -37,6 +39,13 @@ type DB interface {
3739
ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error
3840
}
3941

42+
// PvtdataCompositeKey encloses Namespace, CollectionName and Key components
43+
type PvtdataCompositeKey struct {
44+
Namespace string
45+
CollectionName string
46+
Key string
47+
}
48+
4049
// HashedCompositeKey encloses Namespace, CollectionName and KeyHash components
4150
type HashedCompositeKey struct {
4251
Namespace string
@@ -150,3 +159,33 @@ func (h HashedUpdateBatch) Put(ns, coll string, key []byte, value []byte, versio
150159
func (h HashedUpdateBatch) Delete(ns, coll string, key []byte, version *version.Height) {
151160
h.UpdateMap.Delete(ns, coll, string(key), version)
152161
}
162+
163+
// ToCompositeKeyMap rearranges the update batch data in the form of a single map
164+
func (h HashedUpdateBatch) ToCompositeKeyMap() map[HashedCompositeKey]*statedb.VersionedValue {
165+
m := make(map[HashedCompositeKey]*statedb.VersionedValue)
166+
for ns, nsBatch := range h.UpdateMap {
167+
for _, coll := range nsBatch.GetCollectionNames() {
168+
for key, vv := range nsBatch.GetUpdates(coll) {
169+
m[HashedCompositeKey{ns, coll, key}] = vv
170+
}
171+
}
172+
}
173+
return m
174+
}
175+
176+
// ToCompositeKeyMap rearranges the update batch data in the form of a single map
177+
func (p PvtUpdateBatch) ToCompositeKeyMap() map[PvtdataCompositeKey]*statedb.VersionedValue {
178+
m := make(map[PvtdataCompositeKey]*statedb.VersionedValue)
179+
for ns, nsBatch := range p.UpdateMap {
180+
for _, coll := range nsBatch.GetCollectionNames() {
181+
for key, vv := range nsBatch.GetUpdates(coll) {
182+
m[PvtdataCompositeKey{ns, coll, key}] = vv
183+
}
184+
}
185+
}
186+
return m
187+
}
188+
189+
func (hck *HashedCompositeKey) String() string {
190+
return fmt.Sprintf("ns=%s, collection=%s, keyHash=%x", hck.Namespace, hck.CollectionName, hck.KeyHash)
191+
}

core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,6 @@ func testDB(t *testing.T, env TestEnv) {
129129
assert.Nil(t, vv)
130130
}
131131

132-
//TODO add tests for functions GetPrivateStateMultipleKeys and GetPrivateStateRangeScanIterator
133-
134132
func TestGetStateMultipleKeys(t *testing.T) {
135133
for _, env := range testEnvs {
136134
t.Run(env.GetName(), func(t *testing.T) {
@@ -387,6 +385,26 @@ func testKey(i int) string {
387385
return fmt.Sprintf("key%d", i)
388386
}
389387

388+
func TestCompositeKeyMap(t *testing.T) {
389+
b := NewPvtUpdateBatch()
390+
b.Put("ns1", "coll1", "key1", []byte("testVal1"), nil)
391+
b.Delete("ns1", "coll2", "key2", nil)
392+
b.Put("ns2", "coll1", "key1", []byte("testVal3"), nil)
393+
b.Put("ns2", "coll2", "key2", []byte("testVal4"), nil)
394+
m := b.ToCompositeKeyMap()
395+
testutil.AssertEquals(t, len(m), 4)
396+
vv, ok := m[PvtdataCompositeKey{"ns1", "coll1", "key1"}]
397+
testutil.AssertEquals(t, ok, true)
398+
testutil.AssertEquals(t, vv.Value, []byte("testVal1"))
399+
vv, ok = m[PvtdataCompositeKey{"ns1", "coll2", "key2"}]
400+
testutil.AssertNil(t, vv.Value)
401+
testutil.AssertEquals(t, ok, true)
402+
_, ok = m[PvtdataCompositeKey{"ns2", "coll1", "key1"}]
403+
testutil.AssertEquals(t, ok, true)
404+
_, ok = m[PvtdataCompositeKey{"ns2", "coll2", "key2"}]
405+
testutil.AssertEquals(t, ok, true)
406+
}
407+
390408
func putPvtUpdates(t *testing.T, updates *UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
391409
updates.PvtUpdates.Put(ns, coll, key, value, ver)
392410
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package pvtstatepurgemgmt
8+
9+
import (
10+
math "math"
11+
12+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
13+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
14+
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
15+
"github.com/hyperledger/fabric/core/ledger/util"
16+
)
17+
18+
type expiryScheduleBuilder struct {
19+
btlPolicy pvtdatapolicy.BTLPolicy
20+
scheduleEntries map[expiryInfoKey]*PvtdataKeys
21+
}
22+
23+
func newExpiryScheduleBuilder(btlPolicy pvtdatapolicy.BTLPolicy) *expiryScheduleBuilder {
24+
return &expiryScheduleBuilder{btlPolicy, make(map[expiryInfoKey]*PvtdataKeys)}
25+
}
26+
27+
func (builder *expiryScheduleBuilder) add(ns, coll, key string, keyHash []byte, versionedValue *statedb.VersionedValue) {
28+
committingBlk := versionedValue.Version.BlockNum
29+
expiryBlk := builder.btlPolicy.GetExpiringBlock(ns, coll, committingBlk)
30+
if isDelete(versionedValue) || neverExpires(expiryBlk) {
31+
return
32+
}
33+
expinfoKey := expiryInfoKey{committingBlk: committingBlk, expiryBlk: expiryBlk}
34+
pvtdataKeys, ok := builder.scheduleEntries[expinfoKey]
35+
if !ok {
36+
pvtdataKeys = newPvtdataKeys()
37+
builder.scheduleEntries[expinfoKey] = pvtdataKeys
38+
}
39+
pvtdataKeys.add(ns, coll, key, keyHash)
40+
}
41+
42+
func isDelete(versionedValue *statedb.VersionedValue) bool {
43+
return versionedValue.Value == nil
44+
}
45+
46+
func neverExpires(expiryBlk uint64) bool {
47+
return expiryBlk == math.MaxUint64
48+
}
49+
50+
func (builder *expiryScheduleBuilder) getExpiryInfo() []*expiryInfo {
51+
var listExpinfo []*expiryInfo
52+
for expinfoKey, pvtdataKeys := range builder.scheduleEntries {
53+
expinfoKeyCopy := expinfoKey
54+
listExpinfo = append(listExpinfo, &expiryInfo{expiryInfoKey: &expinfoKeyCopy, pvtdataKeys: pvtdataKeys})
55+
}
56+
return listExpinfo
57+
}
58+
59+
func buildExpirySchedule(
60+
btlPolicy pvtdatapolicy.BTLPolicy,
61+
pvtUpdates *privacyenabledstate.PvtUpdateBatch,
62+
hashedUpdates *privacyenabledstate.HashedUpdateBatch) []*expiryInfo {
63+
64+
hashedUpdateKeys := hashedUpdates.ToCompositeKeyMap()
65+
expiryScheduleBuilder := newExpiryScheduleBuilder(btlPolicy)
66+
67+
// Iterate through the private data updates and for each key add into the expiry schedule
68+
// i.e., when these private data key and it's hashed-keys are going to be expired
69+
// Note that the 'hashedUpdateKeys' may be superset of the pvtUpdates. This is because,
70+
// the peer may not receive all the private data either because the peer is not eligible for certain private data
71+
// or because we allow proceeding with the missing private data data
72+
for pvtUpdateKey, vv := range pvtUpdates.ToCompositeKeyMap() {
73+
keyHash := util.ComputeStringHash(pvtUpdateKey.Key)
74+
expiryScheduleBuilder.add(pvtUpdateKey.Namespace, pvtUpdateKey.CollectionName, pvtUpdateKey.Key, keyHash, vv)
75+
delete(hashedUpdateKeys, privacyenabledstate.HashedCompositeKey{
76+
Namespace: pvtUpdateKey.Namespace, CollectionName: pvtUpdateKey.CollectionName, KeyHash: string(keyHash)})
77+
}
78+
79+
// Add entries for the leftover key hashes i.e., the hashes corresponding to which there is not private key is present
80+
for hashedUpdateKey, vv := range hashedUpdateKeys {
81+
expiryScheduleBuilder.add(hashedUpdateKey.Namespace, hashedUpdateKey.CollectionName, "", []byte(hashedUpdateKey.KeyHash), vv)
82+
}
83+
return expiryScheduleBuilder.getExpiryInfo()
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package pvtstatepurgemgmt
8+
9+
import (
10+
"fmt"
11+
"testing"
12+
13+
"github.com/davecgh/go-spew/spew"
14+
15+
"github.com/hyperledger/fabric/common/ledger/testutil"
16+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
17+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
18+
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
19+
"github.com/hyperledger/fabric/core/ledger/util"
20+
"github.com/spf13/viper"
21+
)
22+
23+
func TestBuildExpirySchedule(t *testing.T) {
24+
ledgerid := "testledger-BuildExpirySchedule"
25+
viper.Set(fmt.Sprintf("ledger.pvtdata.btlpolicy.%s.ns1.coll1", ledgerid), 1)
26+
viper.Set(fmt.Sprintf("ledger.pvtdata.btlpolicy.%s.ns1.coll2", ledgerid), 2)
27+
viper.Set(fmt.Sprintf("ledger.pvtdata.btlpolicy.%s.ns2.coll3", ledgerid), 3)
28+
29+
btlPolicy, _ := pvtdatapolicy.GetBTLPolicy(ledgerid)
30+
31+
updates := privacyenabledstate.NewUpdateBatch()
32+
updates.PubUpdates.Put("ns1", "pubkey1", []byte("pubvalue1"), version.NewHeight(1, 1))
33+
putPvtUpdates(t, updates, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1"), version.NewHeight(1, 1))
34+
putPvtUpdates(t, updates, "ns1", "coll2", "pvtkey2", []byte("pvtvalue2"), version.NewHeight(2, 1))
35+
putPvtUpdates(t, updates, "ns2", "coll3", "pvtkey3", []byte("pvtvalue3"), version.NewHeight(3, 1))
36+
putPvtUpdates(t, updates, "ns3", "coll4", "pvtkey4", []byte("pvtvalue4"), version.NewHeight(4, 1))
37+
38+
listExpinfo := buildExpirySchedule(btlPolicy, updates.PvtUpdates, updates.HashUpdates)
39+
t.Logf("listExpinfo=%s", spew.Sdump(listExpinfo))
40+
41+
pvtdataKeys1 := newPvtdataKeys()
42+
pvtdataKeys1.add("ns1", "coll1", "pvtkey1", util.ComputeStringHash("pvtkey1"))
43+
44+
pvtdataKeys2 := newPvtdataKeys()
45+
pvtdataKeys2.add("ns1", "coll2", "pvtkey2", util.ComputeStringHash("pvtkey2"))
46+
47+
pvtdataKeys3 := newPvtdataKeys()
48+
pvtdataKeys3.add("ns2", "coll3", "pvtkey3", util.ComputeStringHash("pvtkey3"))
49+
50+
expectedListExpInfo := []*expiryInfo{
51+
{expiryInfoKey: &expiryInfoKey{expiryBlk: 3, committingBlk: 1}, pvtdataKeys: pvtdataKeys1},
52+
{expiryInfoKey: &expiryInfoKey{expiryBlk: 5, committingBlk: 2}, pvtdataKeys: pvtdataKeys2},
53+
{expiryInfoKey: &expiryInfoKey{expiryBlk: 7, committingBlk: 3}, pvtdataKeys: pvtdataKeys3},
54+
}
55+
56+
testutil.AssertEquals(t, len(listExpinfo), 3)
57+
testutil.AssertContainsAll(t, listExpinfo, expectedListExpInfo)
58+
}
59+
60+
func putPvtUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
61+
updates.PvtUpdates.Put(ns, coll, key, value, ver)
62+
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
63+
}
64+
65+
func deletePvtUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, ver *version.Height) {
66+
updates.PvtUpdates.Delete(ns, coll, key, ver)
67+
updates.HashUpdates.Delete(ns, coll, util.ComputeStringHash(key), ver)
68+
}

0 commit comments

Comments
 (0)