Skip to content

Commit 0851694

Browse files
committed
[FAB-9452] Enhance chaincode event listers
This CR: - Allows registeration of multiple listeners for chaincode life-cycle events - Introduces a call back to the listeners when the chaincode deploy transaction commits or chaincode install completes Change-Id: Ic9e26b430d8b0c844219edb454e5d410477c9516 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent f4fe817 commit 0851694

File tree

8 files changed

+144
-44
lines changed

8 files changed

+144
-44
lines changed

core/cclifecycle/subscription.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ type Subscription struct {
2424

2525
type depCCsRetriever func(Query, ChaincodePredicate, ...string) (chaincode.MetadataSet, error)
2626

27-
// HandleChaincodeDeploy is expected to be invoked when a chaincode is deployed via a deploy transaction
27+
// HandleChaincodeDeploy is expected to be invoked when a chaincode is deployed via a deploy transaction and the chaicndoe was already
28+
// installed on the peer. This also gets invoked when an already deployed chaincode is installed on the peer
2829
func (sub *Subscription) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt.ChaincodeDefinition, dbArtifactsTar []byte) error {
2930
logger.Debug("Channel", sub.channel, "got a new deployment:", chaincodeDefinition)
3031
query, err := sub.newQuery()
@@ -48,6 +49,12 @@ func (sub *Subscription) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt.
4849
return nil
4950
}
5051

52+
// ChaincodeDeployDone gets invoked when the chaincode deploy transaction or chaincode install
53+
// (the context in which the above function was invoked)
54+
func (sub *Subscription) ChaincodeDeployDone(succeeded bool) {
55+
// Noop
56+
}
57+
5158
func queryChaincodeDefinitions(query Query, ccs []chaincode.InstalledChaincode, deployedCCs depCCsRetriever) (chaincode.MetadataSet, error) {
5259
// map from string and version to chaincode ID
5360
installedCCsToIDs := make(map[nameVersion][]byte)

core/ledger/cceventmgmt/defs.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,14 @@ func (cdef *ChaincodeDefinition) String() string {
2727
// to be able to listen to chaincode lifecycle events. 'dbArtifactsTar' represents db specific artifacts
2828
// (such as index specs) packaged in a tar
2929
type ChaincodeLifecycleEventListener interface {
30-
// HandleChaincodeDeploy is expected to creates all the necessary statedb structures (such as indexes)
30+
// HandleChaincodeDeploy is invoked when chaincode installed + defined becomes true.
31+
// The expected usage are to creates all the necessary statedb structures (such as indexes) and update
32+
// service discovery info. This function is invoked immediately before the committing the state changes
33+
// that contain chaincode definition or when a chaincode install happens
3134
HandleChaincodeDeploy(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error
35+
// ChaincodeDeployDone is invoked after the chaincode deployment is finished - `succeeded` indicates
36+
// whether the deploy finished successfully
37+
ChaincodeDeployDone(succeeded bool)
3238
}
3339

3440
// ChaincodeInfoProvider interface enables event mgr to retrieve chaincode info for a given chaincode

core/ledger/cceventmgmt/lsccstate_listener.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,5 @@ func (listener *KVLedgerLSCCStateListener) InterestedInNamespaces() []string {
6060

6161
// StateCommitDone implements function from interface `ledger.StateListener` as a NOOP
6262
func (listener *KVLedgerLSCCStateListener) StateCommitDone(channelName string) {
63-
// NOOP
63+
GetMgr().ChaincodeDeployDone(channelName)
6464
}

core/ledger/cceventmgmt/mgmt_test.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,45 @@ func TestCCEventMgmt(t *testing.T) {
4141
setEventMgrForTest(newMgr(mockProvider))
4242
defer clearEventMgrForTest()
4343

44-
handler1, handler2 := &mockHandler{}, &mockHandler{}
44+
handler1, handler2, handler3 := &mockHandler{}, &mockHandler{}, &mockHandler{}
4545
eventMgr := GetMgr()
4646
assert.NotNil(t, eventMgr)
4747
eventMgr.Register("channel1", handler1)
4848
eventMgr.Register("channel2", handler2)
49+
eventMgr.Register("channel1", handler3)
50+
eventMgr.Register("channel2", handler3)
4951

5052
cc2ExpectedEvent := &mockEvent{cc2Def, cc2DBArtifactsTar}
53+
_ = cc2ExpectedEvent
5154
cc3ExpectedEvent := &mockEvent{cc3Def, cc3DBArtifactsTar}
5255

53-
// Deploy cc3 on chain1 - only handler1 should recieve event because cc3 is being deployed only on chain1
56+
// Deploy cc3 on chain1 - handler1 and handler3 should recieve event because cc3 is being deployed only on chain1
5457
eventMgr.HandleChaincodeDeploy("channel1", []*ChaincodeDefinition{cc3Def})
58+
eventMgr.ChaincodeDeployDone("channel1")
5559
assert.Contains(t, handler1.eventsRecieved, cc3ExpectedEvent)
5660
assert.NotContains(t, handler2.eventsRecieved, cc3ExpectedEvent)
61+
assert.Contains(t, handler3.eventsRecieved, cc3ExpectedEvent)
62+
assert.Equal(t, 1, handler1.doneRecievedCount)
63+
assert.Equal(t, 0, handler2.doneRecievedCount)
64+
assert.Equal(t, 1, handler3.doneRecievedCount)
5765

5866
// Deploy cc3 on chain2 as well and this time handler2 should also recieve event
5967
eventMgr.HandleChaincodeDeploy("channel2", []*ChaincodeDefinition{cc3Def})
68+
eventMgr.ChaincodeDeployDone("channel2")
6069
assert.Contains(t, handler2.eventsRecieved, cc3ExpectedEvent)
70+
assert.Equal(t, 1, handler1.doneRecievedCount)
71+
assert.Equal(t, 1, handler2.doneRecievedCount)
72+
assert.Equal(t, 2, handler3.doneRecievedCount)
6173

62-
// Install CC2 - only handler1 should receive event because cc2 is deployed only on chain1 and not on chain2
74+
// Install CC2 - handler1 and handler 3 should receive event because cc2 is deployed only on chain1 and not on chain2
6375
eventMgr.HandleChaincodeInstall(cc2Def, cc2DBArtifactsTar)
76+
eventMgr.ChaincodeInstallDone(true)
6477
assert.Contains(t, handler1.eventsRecieved, cc2ExpectedEvent)
6578
assert.NotContains(t, handler2.eventsRecieved, cc2ExpectedEvent)
79+
assert.Contains(t, handler3.eventsRecieved, cc2ExpectedEvent)
80+
assert.Equal(t, 2, handler1.doneRecievedCount)
81+
assert.Equal(t, 1, handler2.doneRecievedCount)
82+
assert.Equal(t, 3, handler3.doneRecievedCount)
6683
}
6784

6885
func TestLSCCListener(t *testing.T) {
@@ -132,7 +149,8 @@ type mockProvider struct {
132149
}
133150

134151
type mockHandler struct {
135-
eventsRecieved []*mockEvent
152+
eventsRecieved []*mockEvent
153+
doneRecievedCount int
136154
}
137155

138156
type mockEvent struct {
@@ -145,6 +163,10 @@ func (l *mockHandler) HandleChaincodeDeploy(chaincodeDefinition *ChaincodeDefini
145163
return nil
146164
}
147165

166+
func (l *mockHandler) ChaincodeDeployDone(succeeded bool) {
167+
l.doneRecievedCount++
168+
}
169+
148170
func newMockProvider() *mockProvider {
149171
return &mockProvider{
150172
make(map[[3]string]bool),

core/ledger/cceventmgmt/mgr.go

+90-36
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
77
package cceventmgmt
88

99
import (
10-
"bytes"
1110
"sync"
1211

1312
"github.com/hyperledger/fabric/common/flogging"
@@ -39,19 +38,15 @@ type Mgr struct {
3938
// we use this lock for contextual use
4039
rwlock sync.RWMutex
4140
infoProvider ChaincodeInfoProvider
42-
ccLifecycleListeners map[string]ChaincodeLifecycleEventListener
43-
// latestChaincodeDeploys maintains last chaincode deployed for a ledger. As stated in the above comment,
44-
// since it is not easy to synchronize across block commit and install activity, this leaves a small window
45-
// where we could miss 'deployed AND installed' state. So, we explicitly maintain the chaincodes deplyed
46-
// in the last block
47-
latestChaincodeDeploys map[string][]*ChaincodeDefinition
41+
ccLifecycleListeners map[string][]ChaincodeLifecycleEventListener
42+
callbackStatus *callbackStatus
4843
}
4944

5045
func newMgr(chaincodeInfoProvider ChaincodeInfoProvider) *Mgr {
5146
return &Mgr{
52-
infoProvider: chaincodeInfoProvider,
53-
ccLifecycleListeners: make(map[string]ChaincodeLifecycleEventListener),
54-
latestChaincodeDeploys: make(map[string][]*ChaincodeDefinition)}
47+
infoProvider: chaincodeInfoProvider,
48+
ccLifecycleListeners: make(map[string][]ChaincodeLifecycleEventListener),
49+
callbackStatus: newCallbackStatus()}
5550
}
5651

5752
// Register registers a ChaincodeLifecycleEventListener for given ledgerid
@@ -60,7 +55,7 @@ func (m *Mgr) Register(ledgerid string, l ChaincodeLifecycleEventListener) {
6055
// write lock to synchronize concurrent 'chaincode install' operations with ledger creation/open
6156
m.rwlock.Lock()
6257
defer m.rwlock.Unlock()
63-
m.ccLifecycleListeners[ledgerid] = l
58+
m.ccLifecycleListeners[ledgerid] = append(m.ccLifecycleListeners[ledgerid], l)
6459
}
6560

6661
// HandleChaincodeDeploy is expected to be invoked when a chaincode is deployed via a deploy transaction
@@ -73,11 +68,8 @@ func (m *Mgr) Register(ledgerid string, l ChaincodeLifecycleEventListener) {
7368
// in this stored `chaincodeDefinitions`
7469
func (m *Mgr) HandleChaincodeDeploy(chainid string, chaincodeDefinitions []*ChaincodeDefinition) error {
7570
logger.Debugf("Channel [%s]: Handling chaincode deploy event for chaincode [%s]", chainid, chaincodeDefinitions)
76-
// Read lock to allow concurrent deploy on multiple channels but to synchronize concurrent `chaincode insall` operation
71+
// Read lock to allow concurrent deploy on multiple channels but to synchronize concurrent `chaincode install` operation
7772
m.rwlock.RLock()
78-
defer m.rwlock.RUnlock()
79-
// TODO, device a mechanism to cleanup entries in this map
80-
m.latestChaincodeDeploys[chainid] = chaincodeDefinitions
8173
for _, chaincodeDefinition := range chaincodeDefinitions {
8274
installed, dbArtifacts, err := m.infoProvider.RetrieveChaincodeArtifacts(chaincodeDefinition)
8375
if err != nil {
@@ -88,6 +80,7 @@ func (m *Mgr) HandleChaincodeDeploy(chainid string, chaincodeDefinitions []*Chai
8880
chainid, chaincodeDefinition)
8981
continue
9082
}
83+
m.callbackStatus.setDeployPending(chainid)
9184
if err := m.invokeHandler(chainid, chaincodeDefinition, dbArtifacts); err != nil {
9285
logger.Warningf("Channel [%s]: Error while invoking a listener for handling chaincode install event: %s", chainid, err)
9386
return err
@@ -97,28 +90,35 @@ func (m *Mgr) HandleChaincodeDeploy(chainid string, chaincodeDefinitions []*Chai
9790
return nil
9891
}
9992

100-
// HandleChaincodeInstall is expected to gets invoked when a during installation of a chaincode package
93+
// ChaincodeDeployDone is expected to be called when the deploy transaction state is committed
94+
func (m *Mgr) ChaincodeDeployDone(chainid string) {
95+
// release the lock aquired in function `HandleChaincodeDeploy`
96+
defer m.rwlock.RUnlock()
97+
if m.callbackStatus.isDeployPending(chainid) {
98+
m.invokeDoneOnHandlers(chainid, true)
99+
m.callbackStatus.unsetDeployPending(chainid)
100+
}
101+
}
102+
103+
// HandleChaincodeInstall is expected to get invoked during installation of a chaincode package
101104
func (m *Mgr) HandleChaincodeInstall(chaincodeDefinition *ChaincodeDefinition, dbArtifacts []byte) error {
102105
logger.Debugf("HandleChaincodeInstall() - chaincodeDefinition=%#v", chaincodeDefinition)
103106
// Write lock prevents concurrent deploy operations
104107
m.rwlock.Lock()
105-
defer m.rwlock.Unlock()
106108
for chainid := range m.ccLifecycleListeners {
107109
logger.Debugf("Channel [%s]: Handling chaincode install event for chaincode [%s]", chainid, chaincodeDefinition)
108110
var deployed bool
109111
var err error
110-
deployed = m.isChaincodePresentInLatestDeploys(chainid, chaincodeDefinition)
111-
if !deployed {
112-
if deployed, err = m.infoProvider.IsChaincodeDeployed(chainid, chaincodeDefinition); err != nil {
113-
logger.Warningf("Channel [%s]: Error while getting the deployment status of chaincode: %s", chainid, err)
114-
return err
115-
}
112+
if deployed, err = m.infoProvider.IsChaincodeDeployed(chainid, chaincodeDefinition); err != nil {
113+
logger.Warningf("Channel [%s]: Error while getting the deployment status of chaincode: %s", chainid, err)
114+
return err
116115
}
117116
if !deployed {
118117
logger.Debugf("Channel [%s]: Chaincode [%s] is not deployed on channel hence not creating chaincode artifacts.",
119118
chainid, chaincodeDefinition)
120119
continue
121120
}
121+
m.callbackStatus.setInstallPending(chainid)
122122
if err := m.invokeHandler(chainid, chaincodeDefinition, dbArtifacts); err != nil {
123123
logger.Warningf("Channel [%s]: Error while invoking a listener for handling chaincode install event: %s", chainid, err)
124124
return err
@@ -128,23 +128,77 @@ func (m *Mgr) HandleChaincodeInstall(chaincodeDefinition *ChaincodeDefinition, d
128128
return nil
129129
}
130130

131-
func (m *Mgr) isChaincodePresentInLatestDeploys(chainid string, chaincodeDefinition *ChaincodeDefinition) bool {
132-
ccDefs, ok := m.latestChaincodeDeploys[chainid]
133-
if !ok {
134-
return false
131+
// ChaincodeInstallDone is expected to get invoked when chaincode install finishes
132+
func (m *Mgr) ChaincodeInstallDone(succeeded bool) {
133+
// release the lock acquired in function `HandleChaincodeInstall`
134+
defer m.rwlock.Unlock()
135+
for chainid := range m.callbackStatus.installPending {
136+
m.invokeDoneOnHandlers(chainid, succeeded)
137+
m.callbackStatus.unsetInstallPending(chainid)
135138
}
136-
for _, ccDef := range ccDefs {
137-
if ccDef.Name == chaincodeDefinition.Name && ccDef.Version == chaincodeDefinition.Version && bytes.Equal(ccDef.Hash, chaincodeDefinition.Hash) {
138-
return true
139+
}
140+
141+
func (m *Mgr) invokeHandler(chainid string, chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error {
142+
listeners := m.ccLifecycleListeners[chainid]
143+
for _, listener := range listeners {
144+
if err := listener.HandleChaincodeDeploy(chaincodeDefinition, dbArtifactsTar); err != nil {
145+
return err
139146
}
140147
}
141-
return false
148+
return nil
142149
}
143150

144-
func (m *Mgr) invokeHandler(chainid string, chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error {
145-
listener := m.ccLifecycleListeners[chainid]
146-
if listener == nil {
147-
return nil
151+
func (m *Mgr) invokeDoneOnHandlers(chainid string, succeeded bool) {
152+
listeners := m.ccLifecycleListeners[chainid]
153+
for _, listener := range listeners {
154+
listener.ChaincodeDeployDone(succeeded)
148155
}
149-
return listener.HandleChaincodeDeploy(chaincodeDefinition, dbArtifactsTar)
156+
}
157+
158+
type callbackStatus struct {
159+
l sync.Mutex
160+
deployPending map[string]bool
161+
installPending map[string]bool
162+
}
163+
164+
func newCallbackStatus() *callbackStatus {
165+
return &callbackStatus{
166+
deployPending: make(map[string]bool),
167+
installPending: make(map[string]bool)}
168+
}
169+
170+
func (s *callbackStatus) setDeployPending(channelID string) {
171+
s.l.Lock()
172+
defer s.l.Unlock()
173+
s.deployPending[channelID] = true
174+
}
175+
176+
func (s *callbackStatus) unsetDeployPending(channelID string) {
177+
s.l.Lock()
178+
defer s.l.Unlock()
179+
delete(s.deployPending, channelID)
180+
}
181+
182+
func (s *callbackStatus) isDeployPending(channelID string) bool {
183+
s.l.Lock()
184+
defer s.l.Unlock()
185+
return s.deployPending[channelID]
186+
}
187+
188+
func (s *callbackStatus) setInstallPending(channelID string) {
189+
s.l.Lock()
190+
defer s.l.Unlock()
191+
s.installPending[channelID] = true
192+
}
193+
194+
func (s *callbackStatus) unsetInstallPending(channelID string) {
195+
s.l.Lock()
196+
defer s.l.Unlock()
197+
delete(s.installPending, channelID)
198+
}
199+
200+
func (s *callbackStatus) isInstallPending(channelID string) bool {
201+
s.l.Lock()
202+
defer s.l.Unlock()
203+
return s.installPending[channelID]
150204
}

core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go

+5
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ func (vdb *VersionedDB) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt.C
119119

120120
}
121121

122+
// ChaincodeDeployDone is a noop for couchdb state impl
123+
func (vdb *VersionedDB) ChaincodeDeployDone(succeeded bool) {
124+
// NOOP
125+
}
126+
122127
// GetDBHandle gets the handle to a named database
123128
func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) {
124129

core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,10 @@ func TestHandleChaincodeDeploy(t *testing.T) {
347347
_, err = db.ExecuteQuery("ns1", queryString)
348348
testutil.AssertError(t, err, "Error should have been thrown for a missing index")
349349

350-
handleDefinition, _ := db.(cceventmgmt.ChaincodeLifecycleEventListener)
350+
handleDefinition, ok := db.(cceventmgmt.ChaincodeLifecycleEventListener)
351+
if !ok {
352+
t.Fatalf("Couchdb state impl is expected to implement interface `cceventmgmt.ChaincodeLifecycleEventListener`")
353+
}
351354

352355
chaincodeDef := &cceventmgmt.ChaincodeDefinition{Name: "ns1", Hash: nil, Version: ""}
353356

core/scc/lscc/lscc.go

+3
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,9 @@ func (lscc *lifeCycleSysCC) executeInstall(stub shim.ChaincodeStubInterface, ccb
432432
// Note - this step is done prior to PutChaincodeToLocalStorage() since this step is idempotent and harmless until endorsements start,
433433
// that is, if there are errors deploying the indexes the chaincode install can safely be re-attempted later.
434434
err = cceventmgmt.GetMgr().HandleChaincodeInstall(chaincodeDefinition, statedbArtifactsTar)
435+
defer func() {
436+
cceventmgmt.GetMgr().ChaincodeInstallDone(err == nil)
437+
}()
435438
if err != nil {
436439
return err
437440
}

0 commit comments

Comments
 (0)