Skip to content

Commit 06188f3

Browse files
authored
Suport dual visibility in visibility persistence checks (#3968)
1 parent 93fb0ec commit 06188f3

18 files changed

+141
-62
lines changed

common/persistence/visibility/defs.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,18 @@ func DefaultAdvancedVisibilityWritingMode(advancedVisibilityConfigExist bool) st
4747
return AdvancedVisibilityWritingModeOff
4848
}
4949

50-
func AllowListForValidation(pluginName string) bool {
51-
switch pluginName {
50+
func AllowListForValidation(storeNames []string) bool {
51+
if len(storeNames) == 0 {
52+
return false
53+
}
54+
55+
if len(storeNames) > 1 {
56+
// If more than one store is configured then it means that dual visibility is enabled.
57+
// Dual visibility is used for migration to advanced, don't allow list of values because it will be removed soon.
58+
return false
59+
}
60+
61+
switch storeNames[0] {
5262
case mysql.PluginNameV8, postgresql.PluginNameV12, sqlite.PluginName:
5363
// Advanced visibility with SQL DB don't support list of values
5464
return false

common/persistence/visibility/manager/visibility_manager.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ type (
4343
// VisibilityManager is used to manage the visibility store
4444
VisibilityManager interface {
4545
persistence.Closeable
46-
GetName() string
46+
GetReadStoreName(nsName namespace.Name) string
47+
GetStoreNames() []string
48+
HasStoreName(stName string) bool
4749
GetIndexName() string
4850

4951
// Write APIs.

common/persistence/visibility/manager/visibility_manager_mock.go

+35-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/visibility/visibility_manager_dual.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ package visibility
2626

2727
import (
2828
"context"
29-
"strings"
3029

30+
"go.temporal.io/server/common/namespace"
3131
"go.temporal.io/server/common/persistence/visibility/manager"
3232
)
3333

@@ -60,8 +60,21 @@ func (v *visibilityManagerDual) Close() {
6060
v.secondaryVisibilityManager.Close()
6161
}
6262

63-
func (v *visibilityManagerDual) GetName() string {
64-
return strings.Join([]string{v.visibilityManager.GetName(), v.secondaryVisibilityManager.GetName()}, ",")
63+
func (v *visibilityManagerDual) GetReadStoreName(nsName namespace.Name) string {
64+
return v.managerSelector.readManager(nsName).GetReadStoreName(nsName)
65+
}
66+
67+
func (v *visibilityManagerDual) GetStoreNames() []string {
68+
return append(v.visibilityManager.GetStoreNames(), v.secondaryVisibilityManager.GetStoreNames()...)
69+
}
70+
71+
func (v *visibilityManagerDual) HasStoreName(stName string) bool {
72+
for _, sn := range v.GetStoreNames() {
73+
if sn == stName {
74+
return true
75+
}
76+
}
77+
return false
6578
}
6679

6780
func (v *visibilityManagerDual) GetIndexName() string {

common/persistence/visibility/visibility_manager_impl.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
workflowpb "go.temporal.io/api/workflow/v1"
3737

3838
"go.temporal.io/server/common/log"
39+
"go.temporal.io/server/common/namespace"
3940
"go.temporal.io/server/common/persistence/visibility/manager"
4041
"go.temporal.io/server/common/persistence/visibility/store"
4142
)
@@ -72,10 +73,18 @@ func (p *visibilityManagerImpl) Close() {
7273
p.store.Close()
7374
}
7475

75-
func (p *visibilityManagerImpl) GetName() string {
76+
func (p *visibilityManagerImpl) GetReadStoreName(_ namespace.Name) string {
7677
return p.store.GetName()
7778
}
7879

80+
func (p *visibilityManagerImpl) GetStoreNames() []string {
81+
return []string{p.store.GetName()}
82+
}
83+
84+
func (p *visibilityManagerImpl) HasStoreName(stName string) bool {
85+
return p.store.GetName() == stName
86+
}
87+
7988
func (p *visibilityManagerImpl) GetIndexName() string {
8089
return p.store.GetIndexName()
8190
}

common/persistence/visibility/visibility_manager_rate_limited.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929

3030
"go.temporal.io/server/common/dynamicconfig"
31+
"go.temporal.io/server/common/namespace"
3132
"go.temporal.io/server/common/persistence"
3233
"go.temporal.io/server/common/persistence/visibility/manager"
3334
"go.temporal.io/server/common/quotas"
@@ -63,8 +64,16 @@ func (m *visibilityManagerRateLimited) Close() {
6364
m.delegate.Close()
6465
}
6566

66-
func (m *visibilityManagerRateLimited) GetName() string {
67-
return m.delegate.GetName()
67+
func (m *visibilityManagerRateLimited) GetReadStoreName(nsName namespace.Name) string {
68+
return m.delegate.GetReadStoreName(nsName)
69+
}
70+
71+
func (m *visibilityManagerRateLimited) GetStoreNames() []string {
72+
return m.delegate.GetStoreNames()
73+
}
74+
75+
func (m *visibilityManagerRateLimited) HasStoreName(stName string) bool {
76+
return m.delegate.HasStoreName(stName)
6877
}
6978

7079
func (m *visibilityManagerRateLimited) GetIndexName() string {

common/persistence/visibility/visiblity_manager_metrics.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"go.temporal.io/server/common/log"
3434
"go.temporal.io/server/common/log/tag"
3535
"go.temporal.io/server/common/metrics"
36+
"go.temporal.io/server/common/namespace"
3637
"go.temporal.io/server/common/persistence"
3738
"go.temporal.io/server/common/persistence/visibility/manager"
3839
)
@@ -65,8 +66,16 @@ func (m *visibilityManagerMetrics) Close() {
6566
m.delegate.Close()
6667
}
6768

68-
func (m *visibilityManagerMetrics) GetName() string {
69-
return m.delegate.GetName()
69+
func (m *visibilityManagerMetrics) GetReadStoreName(nsName namespace.Name) string {
70+
return m.delegate.GetReadStoreName(nsName)
71+
}
72+
73+
func (m *visibilityManagerMetrics) GetStoreNames() []string {
74+
return m.delegate.GetStoreNames()
75+
}
76+
77+
func (m *visibilityManagerMetrics) HasStoreName(stName string) bool {
78+
return m.delegate.HasStoreName(stName)
7079
}
7180

7281
func (m *visibilityManagerMetrics) GetIndexName() string {

service/frontend/adminHandler.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func (adh *AdminHandler) AddSearchAttributes(
275275
// register the search attributes in the cluster metadata if ES is up or if
276276
// `skip-schema-update` is set. This is for backward compatibility using
277277
// standard visibility.
278-
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
278+
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
279279
err = adh.addSearchAttributesElasticsearch(ctx, request, indexName)
280280
} else {
281281
err = adh.addSearchAttributesSQL(ctx, request, currentSearchAttributes)
@@ -414,7 +414,7 @@ func (adh *AdminHandler) RemoveSearchAttributes(
414414
// register the search attributes in the cluster metadata if ES is up or if
415415
// `skip-schema-update` is set. This is for backward compatibility using
416416
// standard visibility.
417-
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
417+
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
418418
err = adh.removeSearchAttributesElasticsearch(ctx, request, indexName)
419419
} else {
420420
err = adh.removeSearchAttributesSQL(ctx, request)
@@ -520,7 +520,7 @@ func (adh *AdminHandler) GetSearchAttributes(
520520
// register the search attributes in the cluster metadata if ES is up or if
521521
// `skip-schema-update` is set. This is for backward compatibility using
522522
// standard visibility.
523-
if adh.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
523+
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
524524
return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
525525
}
526526
return adh.getSearchAttributesSQL(request, searchAttributes)
@@ -1038,7 +1038,7 @@ func (adh *AdminHandler) DescribeCluster(
10381038
ClusterName: metadata.ClusterName,
10391039
HistoryShardCount: metadata.HistoryShardCount,
10401040
PersistenceStore: adh.persistenceExecutionManager.GetName(),
1041-
VisibilityStore: adh.visibilityMgr.GetName(),
1041+
VisibilityStore: strings.Join(adh.visibilityMgr.GetStoreNames(), ","),
10421042
VersionInfo: metadata.VersionInfo,
10431043
FailoverVersionIncrement: metadata.FailoverVersionIncrement,
10441044
InitialFailoverVersion: metadata.InitialFailoverVersion,
@@ -1628,7 +1628,7 @@ func (adh *AdminHandler) DeleteWorkflowExecution(
16281628
var warnings []string
16291629
var branchTokens [][]byte
16301630
var startTime, closeTime *time.Time
1631-
cassVisBackend := strings.Contains(adh.visibilityMgr.GetName(), cassandra.CassandraPersistenceName)
1631+
cassVisBackend := adh.visibilityMgr.HasStoreName(cassandra.CassandraPersistenceName)
16321632

16331633
resp, err := adh.persistenceExecutionManager.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{
16341634
ShardID: shardID,

service/frontend/adminHandler_test.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"time"
3333

3434
"go.temporal.io/server/common/clock"
35+
"go.temporal.io/server/common/persistence/visibility/store/standard/cassandra"
3536
"go.temporal.io/server/common/resourcetest"
3637

3738
"google.golang.org/grpc/health"
@@ -553,7 +554,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() {
553554

554555
mockSdkClient := mocksdk.NewMockClient(s.controller)
555556
s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
556-
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
557+
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
557558

558559
// Start workflow failed.
559560
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-add-search-attributes-workflow", gomock.Any()).Return(nil, errors.New("start failed"))
@@ -606,7 +607,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes_EmptyIndexName() {
606607
s.mockNamespaceCache.EXPECT().GetNamespace(s.namespace).Return(s.namespaceEntry, nil).AnyTimes()
607608

608609
// Elasticsearch is not configured
609-
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
610+
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
610611
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes()
611612
mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return(
612613
&workflowservice.DescribeWorkflowExecutionResponse{}, nil)
@@ -626,7 +627,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes_NonEmptyIndexName() {
626627
s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
627628

628629
// Configure Elasticsearch: add advanced visibility store config with index name.
629-
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
630+
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
630631
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes()
631632

632633
mockSdkClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), "temporal-sys-add-search-attributes-workflow", "").Return(
@@ -685,7 +686,7 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes_EmptyIndexName() {
685686
}
686687

687688
// Elasticsearch is not configured
688-
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
689+
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
689690
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("").AnyTimes()
690691
s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes()
691692
testCases2 := []test{
@@ -748,7 +749,7 @@ func (s *adminHandlerSuite) Test_RemoveSearchAttributes_NonEmptyIndexName() {
748749
}
749750

750751
// Configure Elasticsearch: add advanced visibility store config with index name.
751-
s.mockVisibilityMgr.EXPECT().GetName().Return(elasticsearch.PersistenceName).AnyTimes()
752+
s.mockVisibilityMgr.EXPECT().HasStoreName(elasticsearch.PersistenceName).Return(true).AnyTimes()
752753
s.mockVisibilityMgr.EXPECT().GetIndexName().Return("random-index-name").AnyTimes()
753754
s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes()
754755
for _, testCase := range testCases {
@@ -1162,7 +1163,7 @@ func (s *adminHandlerSuite) Test_DescribeCluster_CurrentCluster_Success() {
11621163
s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{})
11631164
s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0)
11641165
s.mockResource.ExecutionMgr.EXPECT().GetName().Return("")
1165-
s.mockVisibilityMgr.EXPECT().GetName().Return("")
1166+
s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName})
11661167
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
11671168
&persistence.GetClusterMetadataResponse{
11681169
ClusterMetadata: persistencespb.ClusterMetadata{
@@ -1201,7 +1202,7 @@ func (s *adminHandlerSuite) Test_DescribeCluster_NonCurrentCluster_Success() {
12011202
s.mockResource.WorkerServiceResolver.EXPECT().Members().Return([]*membership.HostInfo{})
12021203
s.mockResource.WorkerServiceResolver.EXPECT().MemberCount().Return(0)
12031204
s.mockResource.ExecutionMgr.EXPECT().GetName().Return("")
1204-
s.mockVisibilityMgr.EXPECT().GetName().Return("")
1205+
s.mockVisibilityMgr.EXPECT().GetStoreNames().Return([]string{elasticsearch.PersistenceName})
12051206
s.mockClusterMetadataManager.EXPECT().GetClusterMetadata(gomock.Any(), &persistence.GetClusterMetadataRequest{ClusterName: clusterName}).Return(
12061207
&persistence.GetClusterMetadataResponse{
12071208
ClusterMetadata: persistencespb.ClusterMetadata{
@@ -1257,7 +1258,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_DeleteCurrentExecution()
12571258
}
12581259

12591260
s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
1260-
s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch").AnyTimes()
1261+
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(false)
12611262

12621263
s.mockExecutionMgr.EXPECT().GetCurrentExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error"))
12631264
resp, err := s.handler.DeleteWorkflowExecution(context.Background(), request)
@@ -1332,7 +1333,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_LoadMutableStateFailed()
13321333
}
13331334

13341335
s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
1335-
s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch").AnyTimes()
1336+
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(false)
13361337

13371338
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, errors.New("some random error"))
13381339
s.mockHistoryClient.EXPECT().DeleteWorkflowVisibilityRecord(gomock.Any(), gomock.Any()).Return(&historyservice.DeleteWorkflowVisibilityRecordResponse{}, nil)
@@ -1355,7 +1356,7 @@ func (s *adminHandlerSuite) TestDeleteWorkflowExecution_CassandraVisibilityBacke
13551356
}
13561357

13571358
s.mockNamespaceCache.EXPECT().GetNamespaceID(s.namespace).Return(s.namespaceID, nil).AnyTimes()
1358-
s.mockVisibilityMgr.EXPECT().GetName().Return("elasticsearch,cassandra").AnyTimes()
1359+
s.mockVisibilityMgr.EXPECT().HasStoreName(cassandra.CassandraPersistenceName).Return(true).AnyTimes()
13591360

13601361
// test delete open records
13611362
branchToken := []byte("branchToken")

service/frontend/operator_handler.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (h *OperatorHandlerImpl) AddSearchAttributes(
194194
// register the search attributes in the cluster metadata if ES is up or if
195195
// `skip-schema-update` is set. This is for backward compatibility using
196196
// standard visibility.
197-
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
197+
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
198198
err = h.addSearchAttributesElasticsearch(ctx, request, indexName, currentSearchAttributes)
199199
if err != nil {
200200
if _, isWorkflowErr := err.(*serviceerror.SystemWorkflow); isWorkflowErr {
@@ -352,7 +352,7 @@ func (h *OperatorHandlerImpl) RemoveSearchAttributes(
352352
// register the search attributes in the cluster metadata if ES is up or if
353353
// `skip-schema-update` is set. This is for backward compatibility using
354354
// standard visibility.
355-
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
355+
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
356356
err = h.removeSearchAttributesElasticsearch(ctx, request, indexName)
357357
} else {
358358
err = h.removeSearchAttributesSQL(ctx, request)
@@ -457,7 +457,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(
457457
// register the search attributes in the cluster metadata if ES is up or if
458458
// `skip-schema-update` is set. This is for backward compatibility using
459459
// standard visibility.
460-
if h.visibilityMgr.GetName() == elasticsearch.PersistenceName || indexName == "" {
460+
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
461461
return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
462462
}
463463
return h.listSearchAttributesSQL(request, searchAttributes)

0 commit comments

Comments
 (0)