Skip to content

Commit 04ce8e3

Browse files
authored
Deprecate old namespace change callback (#3774)
1 parent 3882bb6 commit 04ce8e3

16 files changed

+140
-503
lines changed

common/metrics/metric_defs.go

-1
Original file line numberDiff line numberDiff line change
@@ -1562,7 +1562,6 @@ var (
15621562
ShardControllerLockLatency = NewTimerDef("shard_controller_lock_latency")
15631563
ShardLockLatency = NewTimerDef("shard_lock_latency")
15641564
NamespaceRegistryLockLatency = NewTimerDef("namespace_registry_lock_latency")
1565-
NamespaceRegistryCallbackLockLatency = NewTimerDef("namespace_registry_callback_lock_latency")
15661565
ClosedWorkflowBufferEventCount = NewCounterDef("closed_workflow_buffer_event_counter")
15671566

15681567
// Matching

common/namespace/namespace.go

-15
Original file line numberDiff line numberDiff line change
@@ -241,21 +241,6 @@ func (ns *Namespace) GetCustomData(key string) string {
241241
return ns.info.Data[key]
242242
}
243243

244-
// Len return length
245-
func (t Namespaces) Len() int {
246-
return len(t)
247-
}
248-
249-
// Swap implements sort.Interface.
250-
func (t Namespaces) Swap(i, j int) {
251-
t[i], t[j] = t[j], t[i]
252-
}
253-
254-
// Less implements sort.Interface
255-
func (t Namespaces) Less(i, j int) bool {
256-
return t[i].notificationVersion < t[j].notificationVersion
257-
}
258-
259244
// Retention returns retention duration for this namespace.
260245
func (ns *Namespace) Retention() time.Duration {
261246
if ns.config.Retention == nil {

common/namespace/registry.go

+21-179
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ package namespace
2828

2929
import (
3030
"context"
31-
"sort"
3231
"sync"
3332
"sync/atomic"
3433
"time"
3534

3635
"go.temporal.io/api/serviceerror"
37-
"golang.org/x/exp/maps"
3836

3937
"go.temporal.io/server/common"
4038
"go.temporal.io/server/common/cache"
@@ -131,18 +129,16 @@ type (
131129
Registry interface {
132130
common.Daemon
133131
common.Pingable
134-
RegisterNamespaceChangeCallback(listenerID any, initialNotificationVersion int64, prepareCallback PrepareCallbackFn, callback CallbackFn)
135-
UnregisterNamespaceChangeCallback(listenerID any)
136132
GetNamespace(name Name) (*Namespace, error)
137133
GetNamespaceByID(id ID) (*Namespace, error)
138134
GetNamespaceID(name Name) (ID, error)
139135
GetNamespaceName(id ID) (Name, error)
140136
GetCacheSize() (sizeOfCacheByName int64, sizeOfCacheByID int64)
141137
// Refresh forces an immediate refresh of the namespace cache and blocks until it's complete.
142138
Refresh()
143-
// Registers callback for namespace state changes. This is regrettably
144-
// different from the above RegisterNamespaceChangeCallback because we
145-
// need different semantics.
139+
// Registers callback for namespace state changes.
140+
// StateChangeCallbackFn will be invoked for a new/deleted namespace or namespace that has
141+
// State, ReplicationState, ActiveCluster, or isGlobalNamespace config changed.
146142
RegisterStateChangeCallback(key any, cb StateChangeCallbackFn)
147143
UnregisterStateChangeCallback(key any)
148144
}
@@ -156,24 +152,14 @@ type (
156152
clock Clock
157153
metricsHandler metrics.Handler
158154
logger log.Logger
159-
lastRefreshTime atomic.Value
160155
refreshInterval dynamicconfig.DurationPropertyFn
161156

162157
// cacheLock protects cachNameToID, cacheByID and stateChangeCallbacks.
163158
// If the exclusive side is to be held at the same time as the
164159
// callbackLock (below), this lock MUST be acquired *first*.
165-
cacheLock sync.RWMutex
166-
cacheNameToID cache.Cache
167-
cacheByID cache.Cache
168-
169-
// callbackLock protects prepareCallbacks and callbacks. Do not call
170-
// cacheLock.Lock() (the other lock in this struct, above) while holding
171-
// this lock or you risk a deadlock.
172-
callbackLock sync.Mutex
173-
prepareCallbacks map[any]PrepareCallbackFn
174-
callbacks map[any]CallbackFn
175-
176-
// State-change callbacks. Protected by cacheLock
160+
cacheLock sync.RWMutex
161+
cacheNameToID cache.Cache
162+
cacheByID cache.Cache
177163
stateChangeCallbacks map[any]StateChangeCallbackFn
178164
}
179165
)
@@ -196,12 +182,9 @@ func NewRegistry(
196182
logger: logger,
197183
cacheNameToID: cache.New(cacheMaxSize, &cacheOpts),
198184
cacheByID: cache.New(cacheMaxSize, &cacheOpts),
199-
prepareCallbacks: make(map[any]PrepareCallbackFn),
200-
callbacks: make(map[any]CallbackFn),
201185
refreshInterval: refreshInterval,
202186
stateChangeCallbacks: make(map[any]StateChangeCallbackFn),
203187
}
204-
reg.lastRefreshTime.Store(time.Time{})
205188
return reg
206189
}
207190

@@ -257,18 +240,6 @@ func (r *registry) GetPingChecks() []common.PingCheck {
257240
},
258241
MetricsName: metrics.NamespaceRegistryLockLatency.GetMetricName(),
259242
},
260-
{
261-
Name: "namespace registry callback lock",
262-
// we don't do any persistence ops, this shouldn't be blocked
263-
Timeout: 10 * time.Second,
264-
Ping: func() []common.Pingable {
265-
r.callbackLock.Lock()
266-
//lint:ignore SA2001 just checking if we can acquire the lock
267-
r.callbackLock.Unlock()
268-
return nil
269-
},
270-
MetricsName: metrics.NamespaceRegistryCallbackLockLatency.GetMetricName(),
271-
},
272243
}
273244
}
274245

@@ -292,53 +263,6 @@ func (r *registry) getAllNamespaceLocked() map[ID]*Namespace {
292263
return result
293264
}
294265

295-
// RegisterNamespaceChangeCallback set a namespace change callback WARN:
296-
// callback functions MUST NOT call back into this registry instance, either to
297-
// unregister themselves or to look up Namespaces.
298-
func (r *registry) RegisterNamespaceChangeCallback(
299-
listenerID any,
300-
initialNotificationVersion int64,
301-
prepareCallback PrepareCallbackFn,
302-
callback CallbackFn,
303-
) {
304-
305-
r.callbackLock.Lock()
306-
r.prepareCallbacks[listenerID] = prepareCallback
307-
r.callbacks[listenerID] = callback
308-
r.callbackLock.Unlock()
309-
310-
// this section is trying to make the shard catch up with namespace changes
311-
namespaces := Namespaces(maps.Values(r.getAllNamespace()))
312-
// we mush notify the change in a ordered fashion
313-
// since history shard have to update the shard info
314-
// with namespace change version.
315-
sort.Sort(namespaces)
316-
317-
var oldEntries []*Namespace
318-
var newEntries []*Namespace
319-
for _, namespace := range namespaces {
320-
if namespace.notificationVersion >= initialNotificationVersion {
321-
oldEntries = append(oldEntries, nil)
322-
newEntries = append(newEntries, namespace)
323-
}
324-
}
325-
if len(oldEntries) > 0 {
326-
prepareCallback()
327-
callback(oldEntries, newEntries)
328-
}
329-
}
330-
331-
// UnregisterNamespaceChangeCallback delete a namespace failover callback
332-
func (r *registry) UnregisterNamespaceChangeCallback(
333-
listenerID any,
334-
) {
335-
r.callbackLock.Lock()
336-
defer r.callbackLock.Unlock()
337-
338-
delete(r.prepareCallbacks, listenerID)
339-
delete(r.callbacks, listenerID)
340-
}
341-
342266
func (r *registry) RegisterStateChangeCallback(key any, cb StateChangeCallbackFn) {
343267
r.cacheLock.Lock()
344268
r.stateChangeCallbacks[key] = cb
@@ -444,21 +368,13 @@ func (r *registry) refreshLoop(ctx context.Context) error {
444368
}
445369
}
446370
if replyCh != nil {
447-
replyCh <- struct{}{}
371+
replyCh <- struct{}{} // TODO: close replyCh?
448372
}
449373
}
450374
}
451375
}
452376

453377
func (r *registry) refreshNamespaces(ctx context.Context) error {
454-
// first load the metadata record, then load namespaces
455-
// this can guarantee that namespaces in the cache are not updated more than metadata record
456-
metadata, err := r.persistence.GetMetadata(ctx)
457-
if err != nil {
458-
return err
459-
}
460-
namespaceNotificationVersion := metadata.NotificationVersion
461-
462378
request := &persistence.ListNamespacesRequest{
463379
PageSize: CacheRefreshPageSize,
464380
IncludeDeleted: true,
@@ -481,10 +397,6 @@ func (r *registry) refreshNamespaces(ctx context.Context) error {
481397
request.NextPageToken = response.NextPageToken
482398
}
483399

484-
// Sort namespaces by notification version because changes must be applied in this order
485-
// because history shard has to update the shard info with namespace change version.
486-
sort.Sort(namespacesDb)
487-
488400
// Make a copy of the existing namespace cache (excluding deleted), so we can calculate diff and do "compare and swap".
489401
newCacheNameToID := cache.New(cacheMaxSize, &cacheOpts)
490402
newCacheByID := cache.New(cacheMaxSize, &cacheOpts)
@@ -498,49 +410,29 @@ func (r *registry) refreshNamespaces(ctx context.Context) error {
498410
newCacheByID.Put(ID(namespace.info.Id), namespace)
499411
}
500412

501-
var oldEntries []*Namespace
502-
var newEntries []*Namespace
503413
var stateChanged []*Namespace
504-
UpdateLoop:
505414
for _, namespace := range namespacesDb {
506-
if namespace.notificationVersion >= namespaceNotificationVersion {
507-
// this guarantee that namespace change events before the
508-
// namespaceNotificationVersion is loaded into the cache.
509-
510-
// the namespace change events after the namespaceNotificationVersion
511-
// will be loaded into cache in the next refresh
512-
break UpdateLoop
513-
}
514-
oldNS, oldNSAnyVersion := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace)
415+
oldNS := r.updateIDToNamespaceCache(newCacheByID, namespace.ID(), namespace)
515416
newCacheNameToID.Put(namespace.Name(), namespace.ID())
516417

517-
if oldNS != nil {
518-
oldEntries = append(oldEntries, oldNS)
519-
newEntries = append(newEntries, namespace)
520-
}
521-
522418
// this test should include anything that might affect whether a namespace is active on
523419
// this cluster.
524-
if oldNSAnyVersion == nil ||
525-
oldNSAnyVersion.State() != namespace.State() ||
526-
oldNSAnyVersion.IsGlobalNamespace() != namespace.IsGlobalNamespace() ||
527-
oldNSAnyVersion.ActiveClusterName() != namespace.ActiveClusterName() {
420+
if oldNS == nil ||
421+
oldNS.State() != namespace.State() ||
422+
oldNS.IsGlobalNamespace() != namespace.IsGlobalNamespace() ||
423+
oldNS.ActiveClusterName() != namespace.ActiveClusterName() ||
424+
oldNS.ReplicationState() != namespace.ReplicationState() {
528425
stateChanged = append(stateChanged, namespace)
529426
}
530427
}
531428

532429
var stateChangeCallbacks []StateChangeCallbackFn
533430

534-
// NOTE: READ REF BEFORE MODIFICATION
535-
// ref: historyEngine.go registerNamespaceFailoverCallback function
536-
r.publishCacheUpdate(func() (Namespaces, Namespaces) {
537-
r.cacheLock.Lock()
538-
defer r.cacheLock.Unlock()
539-
r.cacheByID = newCacheByID
540-
r.cacheNameToID = newCacheNameToID
541-
stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks)
542-
return oldEntries, newEntries
543-
})
431+
r.cacheLock.Lock()
432+
r.cacheByID = newCacheByID
433+
r.cacheNameToID = newCacheNameToID
434+
stateChangeCallbacks = mapAnyValues(r.stateChangeCallbacks)
435+
r.cacheLock.Unlock()
544436

545437
// call state change callbacks
546438
for _, cb := range stateChangeCallbacks {
@@ -559,15 +451,12 @@ func (r *registry) updateIDToNamespaceCache(
559451
cacheByID cache.Cache,
560452
id ID,
561453
newNS *Namespace,
562-
) (*Namespace, *Namespace) {
454+
) (oldNS *Namespace) {
563455
oldCacheRec := cacheByID.Put(id, newNS)
564456
if oldNS, ok := oldCacheRec.(*Namespace); ok {
565-
if newNS.notificationVersion > oldNS.notificationVersion && r.globalNamespacesEnabled {
566-
return oldNS, oldNS
567-
}
568-
return nil, oldNS
457+
return oldNS
569458
}
570-
return nil, nil
459+
return nil
571460
}
572461

573462
// getNamespace retrieves the information from the cache if it exists
@@ -594,53 +483,6 @@ func (r *registry) getNamespaceByIDLocked(id ID) (*Namespace, error) {
594483
return nil, serviceerror.NewNamespaceNotFound(id.String())
595484
}
596485

597-
func (r *registry) publishCacheUpdate(
598-
updateCache func() (Namespaces, Namespaces),
599-
) {
600-
now := r.clock.Now()
601-
602-
prepareCallbacks, callbacks := r.getNamespaceChangeCallbacks()
603-
604-
r.triggerNamespaceChangePrepareCallback(prepareCallbacks)
605-
oldEntries, newEntries := updateCache()
606-
r.triggerNamespaceChangeCallback(callbacks, oldEntries, newEntries)
607-
r.lastRefreshTime.Store(now)
608-
}
609-
610-
func (r *registry) getNamespaceChangeCallbacks() ([]PrepareCallbackFn, []CallbackFn) {
611-
r.callbackLock.Lock()
612-
defer r.callbackLock.Unlock()
613-
return mapAnyValues(r.prepareCallbacks), mapAnyValues(r.callbacks)
614-
}
615-
616-
func (r *registry) triggerNamespaceChangePrepareCallback(
617-
prepareCallbacks []PrepareCallbackFn,
618-
) {
619-
startTime := time.Now().UTC()
620-
defer func() {
621-
r.metricsHandler.Timer(metrics.NamespaceCachePrepareCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
622-
}()
623-
624-
for _, prepareCallback := range prepareCallbacks {
625-
prepareCallback()
626-
}
627-
}
628-
629-
func (r *registry) triggerNamespaceChangeCallback(
630-
callbacks []CallbackFn,
631-
oldNamespaces []*Namespace,
632-
newNamespaces []*Namespace,
633-
) {
634-
startTime := time.Now().UTC()
635-
defer func() {
636-
r.metricsHandler.Timer(metrics.NamespaceCacheCallbacksLatency.GetMetricName()).Record(time.Since(startTime))
637-
}()
638-
639-
for _, callback := range callbacks {
640-
callback(oldNamespaces, newNamespaces)
641-
}
642-
}
643-
644486
// This is https://pkg.go.dev/golang.org/x/exp/maps#Values except that it works
645487
// for map[any]T (see https://github.com/golang/go/issues/51257 and many more)
646488
func mapAnyValues[T any](m map[any]T) []T {

common/namespace/registry_mock.go

-24
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)