Skip to content

Commit f4fe817

Browse files
committed
[FAB-9442] Enhance ledger state listener mechanism
This CR enhance ledger state listener mechanism and allows - Multiple state listeners for a single namespace - A single listener instance for multiple namespaces - A callback to the listeners upon the commit of the state so that the listener has a choice if it wants to hold the processing of the updates till the state changes are committed This enhancement is expected to be used by dicovery code, maintaining versioned configuration updates [FAB-9203], and local peer state [FAB-8864] Change-Id: I905740e370c3eb083d6a768bad04106c40ce99e1 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 968d12b commit f4fe817

13 files changed

+441
-83
lines changed

core/ledger/cceventmgmt/lsccstate_listener.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@ import (
1616
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
1717
)
1818

19+
const (
20+
lsccNamespace = "lscc"
21+
)
22+
1923
// KVLedgerLSCCStateListener listens for state changes on 'lscc' namespace
2024
type KVLedgerLSCCStateListener struct {
2125
}
2226

2327
// HandleStateUpdates iterates over key-values being written in the 'lscc' namespace (which indicates deployment of a chaincode)
2428
// and invokes `HandleChaincodeDeploy` function on chaincode event manager (which in turn is responsible for creation of statedb
2529
// artifacts for the chaincode statedata)
26-
func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates) error {
27-
kvWrites := stateUpdates.([]*kvrwset.KVWrite)
30+
func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates, committingBlockNum uint64) error {
31+
kvWrites := stateUpdates[lsccNamespace].([]*kvrwset.KVWrite)
2832
logger.Debugf("Channel [%s]: Handling state updates in LSCC namespace - stateUpdates=%#v", channelName, kvWrites)
2933
chaincodeDefs := []*ChaincodeDefinition{}
3034
for _, kvWrite := range kvWrites {
@@ -48,3 +52,13 @@ func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(channelName string
4852
}
4953
return GetMgr().HandleChaincodeDeploy(channelName, chaincodeDefs)
5054
}
55+
56+
// InterestedInNamespaces implements function from interface `ledger.StateListener`
57+
func (listener *KVLedgerLSCCStateListener) InterestedInNamespaces() []string {
58+
return []string{lsccNamespace}
59+
}
60+
61+
// StateCommitDone implements function from interface `ledger.StateListener` as a NOOP
62+
func (listener *KVLedgerLSCCStateListener) StateCommitDone(channelName string) {
63+
// NOOP
64+
}

core/ledger/cceventmgmt/mgmt_test.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/golang/protobuf/proto"
1414
"github.com/hyperledger/fabric/common/flogging"
1515
"github.com/hyperledger/fabric/core/common/ccprovider"
16+
"github.com/hyperledger/fabric/core/ledger"
1617
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
1718
"github.com/stretchr/testify/assert"
1819
)
@@ -90,9 +91,11 @@ func TestLSCCListener(t *testing.T) {
9091
sampleChaincodeData1 := &ccprovider.ChaincodeData{Name: cc1Def.Name, Version: cc1Def.Version, Id: cc1Def.Hash}
9192
sampleChaincodeDataBytes1, err := proto.Marshal(sampleChaincodeData1)
9293
assert.NoError(t, err, "")
93-
lsccStateListener.HandleStateUpdates(channelName, []*kvrwset.KVWrite{
94-
{Key: cc1Def.Name, Value: sampleChaincodeDataBytes1},
95-
})
94+
lsccStateListener.HandleStateUpdates(channelName,
95+
ledger.StateUpdates{
96+
lsccNamespace: []*kvrwset.KVWrite{{Key: cc1Def.Name, Value: sampleChaincodeDataBytes1}},
97+
},
98+
50)
9699
assert.Contains(t, handler1.eventsRecieved, &mockEvent{cc1Def, ccDBArtifactsTar})
97100
})
98101

@@ -101,9 +104,11 @@ func TestLSCCListener(t *testing.T) {
101104
sampleChaincodeData2 := &ccprovider.ChaincodeData{Name: cc2Def.Name, Version: cc2Def.Version, Id: cc2Def.Hash}
102105
sampleChaincodeDataBytes2, err := proto.Marshal(sampleChaincodeData2)
103106
assert.NoError(t, err, "")
104-
lsccStateListener.HandleStateUpdates(channelName, []*kvrwset.KVWrite{
105-
{Key: cc2Def.Name, Value: sampleChaincodeDataBytes2, IsDelete: true},
106-
})
107+
lsccStateListener.HandleStateUpdates(channelName,
108+
ledger.StateUpdates{
109+
lsccNamespace: []*kvrwset.KVWrite{{Key: cc2Def.Name, Value: sampleChaincodeDataBytes2, IsDelete: true}},
110+
},
111+
50)
107112
assert.NotContains(t, handler1.eventsRecieved, &mockEvent{cc2Def, ccDBArtifactsTar})
108113
})
109114

@@ -112,9 +117,11 @@ func TestLSCCListener(t *testing.T) {
112117
sampleChaincodeData3 := &ccprovider.ChaincodeData{Name: cc3Def.Name, Version: cc3Def.Version, Id: cc3Def.Hash}
113118
sampleChaincodeDataBytes3, err := proto.Marshal(sampleChaincodeData3)
114119
assert.NoError(t, err, "")
115-
lsccStateListener.HandleStateUpdates(channelName, []*kvrwset.KVWrite{
116-
{Key: cc3Def.Name, Value: sampleChaincodeDataBytes3},
117-
})
120+
lsccStateListener.HandleStateUpdates(channelName,
121+
ledger.StateUpdates{
122+
lsccNamespace: []*kvrwset.KVWrite{{Key: cc3Def.Name, Value: sampleChaincodeDataBytes3}},
123+
},
124+
50)
118125
assert.NotContains(t, handler1.eventsRecieved, &mockEvent{cc3Def, ccDBArtifactsTar})
119126
})
120127
}

core/ledger/kvledger/kv_ledger.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type kvLedger struct {
4141
// NewKVLedger constructs new `KVLedger`
4242
func newKVLedger(ledgerID string, blockStore *ledgerstorage.Store,
4343
versionedDB privacyenabledstate.DB, historyDB historydb.HistoryDB,
44-
stateListeners ledger.StateListeners) (*kvLedger, error) {
44+
stateListeners []ledger.StateListener) (*kvLedger, error) {
4545

4646
logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)
4747

core/ledger/kvledger/kv_ledger_provider.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type Provider struct {
4242
ledgerStoreProvider *ledgerstorage.Provider
4343
vdbProvider privacyenabledstate.DBProvider
4444
historydbProvider historydb.HistoryDBProvider
45-
stateListeners ledger.StateListeners
45+
stateListeners []ledger.StateListener
4646
}
4747

4848
// NewProvider instantiates a new Provider.
@@ -73,7 +73,7 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
7373
}
7474

7575
// Initialize implements the corresponding method from interface ledger.PeerLedgerProvider
76-
func (provider *Provider) Initialize(stateListeners ledger.StateListeners) {
76+
func (provider *Provider) Initialize(stateListeners []ledger.StateListener) {
7777
provider.stateListeners = stateListeners
7878
}
7979

core/ledger/kvledger/state_listener_test.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func TestStateListener(t *testing.T) {
2525
// create a listener and register it to listen to state change in a namespace
2626
channelid := "testLedger"
2727
namespace := "testchaincode"
28-
mockListener := &mockStateListener{}
29-
provider.Initialize(ledger.StateListeners{namespace: mockListener})
28+
mockListener := &mockStateListener{namespace: namespace}
29+
provider.Initialize([]ledger.StateListener{mockListener})
3030

3131
bg, gb := testutil.NewBlockGenerator(t, channelid, false)
3232
lgr, err := provider.Create(gb)
@@ -89,15 +89,24 @@ func TestStateListener(t *testing.T) {
8989

9090
type mockStateListener struct {
9191
channelName string
92+
namespace string
9293
kvWrites []*kvrwset.KVWrite
9394
}
9495

95-
func (l *mockStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates) error {
96+
func (l *mockStateListener) InterestedInNamespaces() []string {
97+
return []string{l.namespace}
98+
}
99+
100+
func (l *mockStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates, committingBlockNum uint64) error {
96101
l.channelName = channelName
97-
l.kvWrites = stateUpdates.([]*kvrwset.KVWrite)
102+
l.kvWrites = stateUpdates[l.namespace].([]*kvrwset.KVWrite)
98103
return nil
99104
}
100105

106+
func (l *mockStateListener) StateCommitDone(channelID string) {
107+
// NOOP
108+
}
109+
101110
func (l *mockStateListener) reset() {
102111
l.channelName = ""
103112
l.kvWrites = nil

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go

+68-35
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,27 @@ type LockBasedTxMgr struct {
2727
ledgerid string
2828
db privacyenabledstate.DB
2929
validator validator.Validator
30-
batch *privacyenabledstate.UpdateBatch
31-
currentBlock *common.Block
32-
stateListeners ledger.StateListeners
30+
stateListeners []ledger.StateListener
3331
commitRWLock sync.RWMutex
32+
current *current
33+
}
34+
35+
type current struct {
36+
block *common.Block
37+
batch *privacyenabledstate.UpdateBatch
38+
listeners []ledger.StateListener
39+
}
40+
41+
func (c *current) blockNum() uint64 {
42+
return c.block.Header.Number
43+
}
44+
45+
func (c *current) maxTxNumber() uint64 {
46+
return uint64(len(c.block.Data.Data)) - 1
3447
}
3548

3649
// NewLockBasedTxMgr constructs a new instance of NewLockBasedTxMgr
37-
func NewLockBasedTxMgr(ledgerid string, db privacyenabledstate.DB, stateListeners ledger.StateListeners) *LockBasedTxMgr {
50+
func NewLockBasedTxMgr(ledgerid string, db privacyenabledstate.DB, stateListeners []ledger.StateListener) *LockBasedTxMgr {
3851
db.Open()
3952
txmgr := &LockBasedTxMgr{ledgerid: ledgerid, db: db, stateListeners: stateListeners}
4053
txmgr.validator = valimpl.NewStatebasedValidator(txmgr, db)
@@ -71,30 +84,28 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
7184
logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data))
7285
batch, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
7386
if err != nil {
74-
txmgr.clearCache()
87+
txmgr.reset()
88+
return err
89+
}
90+
txmgr.current = &current{block: block, batch: batch}
91+
if err := txmgr.invokeNamespaceListeners(); err != nil {
92+
txmgr.reset()
7593
return err
7694
}
77-
txmgr.currentBlock = block
78-
txmgr.batch = batch
79-
return txmgr.invokeNamespaceListeners(batch)
95+
return nil
8096
}
8197

82-
func (txmgr *LockBasedTxMgr) invokeNamespaceListeners(batch *privacyenabledstate.UpdateBatch) error {
83-
namespaces := batch.PubUpdates.GetUpdatedNamespaces()
84-
for _, namespace := range namespaces {
85-
listener := txmgr.stateListeners[namespace]
86-
if listener == nil {
98+
func (txmgr *LockBasedTxMgr) invokeNamespaceListeners() error {
99+
for _, listener := range txmgr.stateListeners {
100+
stateUpdatesForListener := extractStateUpdates(txmgr.current.batch, listener.InterestedInNamespaces())
101+
if len(stateUpdatesForListener) == 0 {
87102
continue
88103
}
89-
logger.Debugf("Invoking listener for state changes over namespace:%s", namespace)
90-
updatesMap := batch.PubUpdates.GetUpdates(namespace)
91-
var kvwrites []*kvrwset.KVWrite
92-
for key, versionedValue := range updatesMap {
93-
kvwrites = append(kvwrites, &kvrwset.KVWrite{Key: key, IsDelete: versionedValue.Value == nil, Value: versionedValue.Value})
94-
}
95-
if err := listener.HandleStateUpdates(txmgr.ledgerid, kvwrites); err != nil {
104+
txmgr.current.listeners = append(txmgr.current.listeners, listener)
105+
if err := listener.HandleStateUpdates(txmgr.ledgerid, stateUpdatesForListener, txmgr.current.blockNum()); err != nil {
96106
return err
97107
}
108+
logger.Debugf("Invoking listener for state changes:%s", listener)
98109
}
99110
return nil
100111
}
@@ -106,35 +117,28 @@ func (txmgr *LockBasedTxMgr) Shutdown() {
106117

107118
// Commit implements method in interface `txmgmt.TxMgr`
108119
func (txmgr *LockBasedTxMgr) Commit() error {
109-
// If statedb implementation needed bulk read optimization, cache might have been populated by
110-
// ValidateAndPrepare(). Once the block is validated and committed, populated cache needs to
111-
// be cleared.
112-
defer txmgr.clearCache()
113-
120+
defer txmgr.reset()
114121
logger.Debugf("Committing updates to state database")
115122
txmgr.commitRWLock.Lock()
116123
defer txmgr.commitRWLock.Unlock()
117124
logger.Debugf("Write lock acquired for committing updates to state database")
118-
if txmgr.batch == nil {
125+
if txmgr.current == nil {
119126
panic("validateAndPrepare() method should have been called before calling commit()")
120127
}
121-
defer func() { txmgr.batch = nil }()
122-
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.batch,
123-
version.NewHeight(txmgr.currentBlock.Header.Number, uint64(len(txmgr.currentBlock.Data.Data)-1))); err != nil {
128+
commitHeight := version.NewHeight(txmgr.current.blockNum(), txmgr.current.maxTxNumber())
129+
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
124130
return err
125131
}
126132
logger.Debugf("Updates committed to state database")
127-
133+
// In the case of error state listeners will not recieve this call - instead a peer panic is caused by the ledger upon receiveing
134+
// an error from this function
135+
txmgr.updateStateListeners()
128136
return nil
129137
}
130138

131139
// Rollback implements method in interface `txmgmt.TxMgr`
132140
func (txmgr *LockBasedTxMgr) Rollback() {
133-
txmgr.batch = nil
134-
// If statedb implementation needed bulk read optimization, cache might have been populated by
135-
// ValidateAndPrepareBatch(). As the block commit is rollbacked, populated cache needs to
136-
// be cleared now.
137-
txmgr.clearCache()
141+
txmgr.reset()
138142
}
139143

140144
// clearCache empty the cache maintained by the statedb implementation
@@ -166,3 +170,32 @@ func (txmgr *LockBasedTxMgr) CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvt
166170
logger.Debugf("Committing block %d to state database", block.Header.Number)
167171
return txmgr.Commit()
168172
}
173+
174+
func extractStateUpdates(batch *privacyenabledstate.UpdateBatch, namespaces []string) ledger.StateUpdates {
175+
stateupdates := make(ledger.StateUpdates)
176+
for _, namespace := range namespaces {
177+
updatesMap := batch.PubUpdates.GetUpdates(namespace)
178+
var kvwrites []*kvrwset.KVWrite
179+
for key, versionedValue := range updatesMap {
180+
kvwrites = append(kvwrites, &kvrwset.KVWrite{Key: key, IsDelete: versionedValue.Value == nil, Value: versionedValue.Value})
181+
if len(kvwrites) > 0 {
182+
stateupdates[namespace] = kvwrites
183+
}
184+
}
185+
}
186+
return stateupdates
187+
}
188+
189+
func (txmgr *LockBasedTxMgr) updateStateListeners() {
190+
for _, l := range txmgr.current.listeners {
191+
l.StateCommitDone(txmgr.ledgerid)
192+
}
193+
}
194+
195+
func (txmgr *LockBasedTxMgr) reset() {
196+
txmgr.current = nil
197+
// If statedb implementation needed bulk read optimization, cache might have been populated by
198+
// ValidateAndPrepare(). Once the block is validated and committed, populated cache needs to
199+
// be cleared.
200+
defer txmgr.clearCache()
201+
}

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/pkg_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ var testEnvs = []testEnv{
4949
&lockBasedEnv{name: couchDBtestEnvName, testDBEnv: &privacyenabledstate.CouchDBCommonStorageTestEnv{}},
5050
}
5151

52+
var testEnvsMap = map[string]testEnv{
53+
levelDBtestEnvName: testEnvs[0],
54+
couchDBtestEnvName: testEnvs[1],
55+
}
56+
5257
///////////// LevelDB Environment //////////////
5358

5459
type lockBasedEnv struct {

0 commit comments

Comments
 (0)