Skip to content

Commit e9fe06c

Browse files
committed
[FAB-6735] service discovery acl cache
This change set adds a cache for access control computations for service discovery. The cache caches eligibility of clients and is completely purged upon config change, and is partly purged according to its capacity Change-Id: Iec954dc60f2a34430fa7a358e7d2dce2b09cfa0d Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 4d56117 commit e9fe06c

File tree

6 files changed

+467
-33
lines changed

6 files changed

+467
-33
lines changed

discovery/api.go

+3
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,7 @@ type support interface {
3939

4040
// Config returns the channel's configuration
4141
Config(channel string) (*discovery2.ConfigResult, error)
42+
43+
// ConfigSequence returns the configuration sequence of the a given channel
44+
ConfigSequence(channel string) uint64
4245
}

discovery/auth.go

-31
This file was deleted.

discovery/authcache.go

+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package discovery
8+
9+
import (
10+
"encoding/asn1"
11+
"encoding/hex"
12+
"sync"
13+
14+
"github.com/hyperledger/fabric/common/util"
15+
"github.com/hyperledger/fabric/protos/common"
16+
"github.com/pkg/errors"
17+
)
18+
19+
const (
20+
defaultMaxCacheSize = 1000
21+
defaultRetentionRatio = 0.75
22+
)
23+
24+
var (
25+
// asBytes is the function that is used to marshal common.SignedData to bytes
26+
asBytes = asn1.Marshal
27+
)
28+
29+
type acSupport interface {
30+
// Eligible returns whether the given peer is eligible for receiving
31+
// service from the discovery service for a given channel
32+
EligibleForService(channel string, data common.SignedData) error
33+
34+
// ConfigSequence returns the configuration sequence of the a given channel
35+
ConfigSequence(channel string) uint64
36+
}
37+
38+
type authCacheConfig struct {
39+
// maxCacheSize is the maximum size of the cache, after which
40+
// a purge takes place
41+
maxCacheSize int
42+
// purgeRetentionRatio is the % of entries that remain in the cache
43+
// after the cache is purged due to overpopulation
44+
purgeRetentionRatio float32
45+
}
46+
47+
// authCache defines an interface that authenticates a request in a channel context,
48+
// and does memoization of invocations
49+
type authCache struct {
50+
credentialCache map[string]*accessCache
51+
acSupport
52+
sync.RWMutex
53+
conf authCacheConfig
54+
}
55+
56+
func newAuthCache(s acSupport, conf authCacheConfig) *authCache {
57+
return &authCache{
58+
acSupport: s,
59+
credentialCache: make(map[string]*accessCache),
60+
conf: conf,
61+
}
62+
}
63+
64+
// Eligible returns whether the given peer is eligible for receiving
65+
// service from the discovery service for a given channel
66+
func (ac *authCache) EligibleForService(channel string, data common.SignedData) error {
67+
// Check whether we already have a cache for this channel
68+
ac.RLock()
69+
cache := ac.credentialCache[channel]
70+
ac.RUnlock()
71+
if cache == nil {
72+
// Cache for given channel wasn't found, so create a new one
73+
ac.Lock()
74+
cache = ac.newAccessCache(channel)
75+
// And store the cache instance.
76+
ac.credentialCache[channel] = cache
77+
ac.Unlock()
78+
}
79+
return cache.EligibleForService(data)
80+
}
81+
82+
type accessCache struct {
83+
sync.RWMutex
84+
channel string
85+
ac *authCache
86+
lastSequence uint64
87+
entries map[string]error
88+
}
89+
90+
func (ac *authCache) newAccessCache(channel string) *accessCache {
91+
return &accessCache{
92+
channel: channel,
93+
ac: ac,
94+
entries: make(map[string]error),
95+
}
96+
}
97+
98+
func (cache *accessCache) EligibleForService(data common.SignedData) error {
99+
key, err := signedDataToKey(data)
100+
if err != nil {
101+
logger.Warningf("Failed computing key of signed data: +%v", err)
102+
return errors.Wrap(err, "failed computing key of signed data")
103+
}
104+
currSeq := cache.ac.acSupport.ConfigSequence(cache.channel)
105+
if cache.isValid(currSeq) {
106+
foundInCache, isEligibleErr := cache.lookup(key)
107+
if foundInCache {
108+
return isEligibleErr
109+
}
110+
} else {
111+
cache.configChange(currSeq)
112+
}
113+
114+
// Make sure the cache doesn't overpopulate.
115+
// It might happen that it overgrows the maximum size due to concurrent
116+
// goroutines waiting on the lock above, but that's acceptable.
117+
cache.purgeEntriesIfNeeded()
118+
119+
// Compute the eligibility of the client for the service
120+
err = cache.ac.acSupport.EligibleForService(cache.channel, data)
121+
cache.Lock()
122+
defer cache.Unlock()
123+
// Check if the sequence hasn't changed since last time
124+
if currSeq != cache.ac.acSupport.ConfigSequence(cache.channel) {
125+
// The sequence at which we computed the eligibility might have changed,
126+
// so we can't put it into the cache because a more fresh computation result
127+
// might already be present in the cache by now, and we don't want to override it
128+
// with a stale computation result, so just return the result.
129+
return err
130+
}
131+
// Else, the eligibility of the client has been computed under the latest sequence,
132+
// so store the computation result in the cache
133+
cache.entries[key] = err
134+
return err
135+
}
136+
137+
func (cache *accessCache) isPurgeNeeded() bool {
138+
cache.RLock()
139+
defer cache.RUnlock()
140+
return len(cache.entries)+1 > cache.ac.conf.maxCacheSize
141+
}
142+
143+
func (cache *accessCache) purgeEntriesIfNeeded() {
144+
if !cache.isPurgeNeeded() {
145+
return
146+
}
147+
148+
cache.Lock()
149+
defer cache.Unlock()
150+
151+
maxCacheSize := cache.ac.conf.maxCacheSize
152+
purgeRatio := cache.ac.conf.purgeRetentionRatio
153+
entries2evict := maxCacheSize - int(purgeRatio*float32(maxCacheSize))
154+
155+
for key := range cache.entries {
156+
if entries2evict == 0 {
157+
return
158+
}
159+
entries2evict--
160+
delete(cache.entries, key)
161+
}
162+
}
163+
164+
func (cache *accessCache) isValid(currSeq uint64) bool {
165+
cache.RLock()
166+
defer cache.RUnlock()
167+
return currSeq == cache.lastSequence
168+
}
169+
170+
func (cache *accessCache) configChange(currSeq uint64) {
171+
cache.Lock()
172+
defer cache.Unlock()
173+
cache.lastSequence = currSeq
174+
// Invalidate entries
175+
cache.entries = make(map[string]error)
176+
}
177+
178+
func (cache *accessCache) lookup(key string) (cacheHit bool, lookupResult error) {
179+
cache.RLock()
180+
defer cache.RUnlock()
181+
182+
lookupResult, cacheHit = cache.entries[key]
183+
return
184+
}
185+
186+
func signedDataToKey(data common.SignedData) (string, error) {
187+
b, err := asBytes(data)
188+
if err != nil {
189+
return "", errors.Wrap(err, "failed marshaling signed data")
190+
}
191+
return hex.EncodeToString(util.ComputeSHA256(b)), nil
192+
}

0 commit comments

Comments
 (0)