Skip to content

Commit 750fc54

Browse files
authored
Implement searchattribute.MapperProvider (#3873)
1 parent cb22aab commit 750fc54

33 files changed

+393
-272
lines changed

common/persistence/visibility/factory.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewManager(
4949
esClient esclient.Client,
5050
esProcessorConfig *elasticsearch.ProcessorConfig,
5151
searchAttributesProvider searchattribute.Provider,
52-
searchAttributesMapper searchattribute.Mapper,
52+
searchAttributesMapperProvider searchattribute.MapperProvider,
5353

5454
standardVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn,
5555
standardVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn,
@@ -81,7 +81,7 @@ func NewManager(
8181
esClient,
8282
esProcessorConfig,
8383
searchAttributesProvider,
84-
searchAttributesMapper,
84+
searchAttributesMapperProvider,
8585
advancedVisibilityPersistenceMaxReadQPS,
8686
advancedVisibilityPersistenceMaxWriteQPS,
8787
visibilityDisableOrderByClause,
@@ -97,7 +97,7 @@ func NewManager(
9797
esClient,
9898
esProcessorConfig,
9999
searchAttributesProvider,
100-
searchAttributesMapper,
100+
searchAttributesMapperProvider,
101101
advancedVisibilityPersistenceMaxReadQPS,
102102
advancedVisibilityPersistenceMaxWriteQPS,
103103
visibilityDisableOrderByClause,
@@ -187,7 +187,7 @@ func NewAdvancedManager(
187187
esClient esclient.Client,
188188
esProcessorConfig *elasticsearch.ProcessorConfig,
189189
searchAttributesProvider searchattribute.Provider,
190-
searchAttributesMapper searchattribute.Mapper,
190+
searchAttributesMapperProvider searchattribute.MapperProvider,
191191

192192
advancedVisibilityPersistenceMaxReadQPS dynamicconfig.IntPropertyFn,
193193
advancedVisibilityPersistenceMaxWriteQPS dynamicconfig.IntPropertyFn,
@@ -205,7 +205,7 @@ func NewAdvancedManager(
205205
esClient,
206206
esProcessorConfig,
207207
searchAttributesProvider,
208-
searchAttributesMapper,
208+
searchAttributesMapperProvider,
209209
visibilityDisableOrderByClause,
210210
metricsHandler,
211211
logger)
@@ -289,7 +289,7 @@ func newAdvancedVisibilityStore(
289289
esClient esclient.Client,
290290
esProcessorConfig *elasticsearch.ProcessorConfig,
291291
searchAttributesProvider searchattribute.Provider,
292-
searchAttributesMapper searchattribute.Mapper,
292+
searchAttributesMapperProvider searchattribute.MapperProvider,
293293
visibilityDisableOrderByClause dynamicconfig.BoolPropertyFn,
294294
metricsHandler metrics.Handler,
295295
logger log.Logger,
@@ -311,7 +311,7 @@ func newAdvancedVisibilityStore(
311311
esClient,
312312
defaultIndexName,
313313
searchAttributesProvider,
314-
searchAttributesMapper,
314+
searchAttributesMapperProvider,
315315
esProcessor,
316316
esProcessorAckTimeout,
317317
visibilityDisableOrderByClause,

common/persistence/visibility/store/elasticsearch/query_interceptors.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ import (
3939

4040
type (
4141
nameInterceptor struct {
42-
namespace namespace.Name
43-
index string
44-
searchAttributesTypeMap searchattribute.NameTypeMap
45-
searchAttributesMapper searchattribute.Mapper
46-
seenNamespaceDivision bool
42+
namespace namespace.Name
43+
index string
44+
searchAttributesTypeMap searchattribute.NameTypeMap
45+
searchAttributesMapperProvider searchattribute.MapperProvider
46+
seenNamespaceDivision bool
4747
}
4848
valuesInterceptor struct{}
4949
)
@@ -52,13 +52,13 @@ func newNameInterceptor(
5252
namespace namespace.Name,
5353
index string,
5454
saTypeMap searchattribute.NameTypeMap,
55-
searchAttributesMapper searchattribute.Mapper,
55+
searchAttributesMapperProvider searchattribute.MapperProvider,
5656
) *nameInterceptor {
5757
return &nameInterceptor{
58-
namespace: namespace,
59-
index: index,
60-
searchAttributesTypeMap: saTypeMap,
61-
searchAttributesMapper: searchAttributesMapper,
58+
namespace: namespace,
59+
index: index,
60+
searchAttributesTypeMap: saTypeMap,
61+
searchAttributesMapperProvider: searchAttributesMapperProvider,
6262
}
6363
}
6464

@@ -68,12 +68,17 @@ func NewValuesInterceptor() *valuesInterceptor {
6868

6969
func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string, error) {
7070
fieldName := name
71-
if searchattribute.IsMappable(name) && ni.searchAttributesMapper != nil {
72-
var err error
73-
fieldName, err = ni.searchAttributesMapper.GetFieldName(name, ni.namespace.String())
71+
if searchattribute.IsMappable(name) {
72+
mapper, err := ni.searchAttributesMapperProvider.GetMapper(ni.namespace)
7473
if err != nil {
7574
return "", err
7675
}
76+
if mapper != nil {
77+
fieldName, err = mapper.GetFieldName(name, ni.namespace.String())
78+
if err != nil {
79+
return "", err
80+
}
81+
}
7782
}
7883

7984
fieldType, err := ni.searchAttributesTypeMap.GetType(fieldName)

common/persistence/visibility/store/elasticsearch/visibility_store.go

+19-19
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ var defaultSorter = []elastic.Sorter{
6868

6969
type (
7070
visibilityStore struct {
71-
esClient client.Client
72-
index string
73-
searchAttributesProvider searchattribute.Provider
74-
searchAttributesMapper searchattribute.Mapper
75-
processor Processor
76-
processorAckTimeout dynamicconfig.DurationPropertyFn
77-
disableOrderByClause dynamicconfig.BoolPropertyFn
78-
metricsHandler metrics.Handler
71+
esClient client.Client
72+
index string
73+
searchAttributesProvider searchattribute.Provider
74+
searchAttributesMapperProvider searchattribute.MapperProvider
75+
processor Processor
76+
processorAckTimeout dynamicconfig.DurationPropertyFn
77+
disableOrderByClause dynamicconfig.BoolPropertyFn
78+
metricsHandler metrics.Handler
7979
}
8080

8181
visibilityPageToken struct {
@@ -100,22 +100,22 @@ func NewVisibilityStore(
100100
esClient client.Client,
101101
index string,
102102
searchAttributesProvider searchattribute.Provider,
103-
searchAttributesMapper searchattribute.Mapper,
103+
searchAttributesMapperProvider searchattribute.MapperProvider,
104104
processor Processor,
105105
processorAckTimeout dynamicconfig.DurationPropertyFn,
106106
disableOrderByClause dynamicconfig.BoolPropertyFn,
107107
metricsHandler metrics.Handler,
108108
) *visibilityStore {
109109

110110
return &visibilityStore{
111-
esClient: esClient,
112-
index: index,
113-
searchAttributesProvider: searchAttributesProvider,
114-
searchAttributesMapper: searchAttributesMapper,
115-
processor: processor,
116-
processorAckTimeout: processorAckTimeout,
117-
disableOrderByClause: disableOrderByClause,
118-
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)),
111+
esClient: esClient,
112+
index: index,
113+
searchAttributesProvider: searchAttributesProvider,
114+
searchAttributesMapperProvider: searchAttributesMapperProvider,
115+
processor: processor,
116+
processorAckTimeout: processorAckTimeout,
117+
disableOrderByClause: disableOrderByClause,
118+
metricsHandler: metricsHandler.WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)),
119119
}
120120
}
121121

@@ -730,7 +730,7 @@ func (s *visibilityStore) convertQuery(
730730
if err != nil {
731731
return nil, nil, serviceerror.NewUnavailable(fmt.Sprintf("Unable to read search attribute types: %v", err))
732732
}
733-
nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapper)
733+
nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapperProvider)
734734
queryConverter := newQueryConverter(nameInterceptor, NewValuesInterceptor())
735735
requestQuery, fieldSorts, err := queryConverter.ConvertWhereOrderBy(requestQueryStr)
736736
if err != nil {
@@ -991,7 +991,7 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa
991991
s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()).Record(1)
992992
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to encode custom search attributes of Elasticsearch document(%s): %v", docID, err))
993993
}
994-
aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapper, record.SearchAttributes, namespace.String())
994+
aliasedSas, err := searchattribute.AliasFields(s.searchAttributesMapperProvider, record.SearchAttributes, namespace.String())
995995
if err != nil {
996996
return nil, err
997997
}

common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go

+26-43
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ type (
5757
suite.Suite
5858
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
5959
*require.Assertions
60-
controller *gomock.Controller
61-
visibilityStore *visibilityStore
62-
mockESClient *client.MockClient
63-
mockProcessor *MockProcessor
64-
mockMetricsHandler *metrics.MockHandler
65-
mockSearchAttributesMapper *searchattribute.MockMapper
60+
controller *gomock.Controller
61+
visibilityStore *visibilityStore
62+
mockESClient *client.MockClient
63+
mockProcessor *MockProcessor
64+
mockMetricsHandler *metrics.MockHandler
65+
mockSearchAttributesMapperProvider *searchattribute.MockMapperProvider
6666
}
6767
)
6868

@@ -126,12 +126,12 @@ func (s *ESVisibilitySuite) SetupTest() {
126126
s.mockMetricsHandler.EXPECT().WithTags(metrics.OperationTag(metrics.ElasticsearchVisibility)).Return(s.mockMetricsHandler).AnyTimes()
127127
s.mockProcessor = NewMockProcessor(s.controller)
128128
s.mockESClient = client.NewMockClient(s.controller)
129-
s.mockSearchAttributesMapper = searchattribute.NewMockMapper(s.controller)
129+
s.mockSearchAttributesMapperProvider = searchattribute.NewMockMapperProvider(s.controller)
130130
s.visibilityStore = NewVisibilityStore(
131131
s.mockESClient,
132132
testIndex,
133133
searchattribute.NewTestProvider(),
134-
nil,
134+
searchattribute.NewTestMapperProvider(nil),
135135
s.mockProcessor,
136136
esProcessorAckTimeout,
137137
visibilityDisableOrderByClause,
@@ -649,15 +649,10 @@ func (s *ESVisibilitySuite) Test_convertQuery() {
649649
}
650650

651651
func (s *ESVisibilitySuite) Test_convertQuery_Mapper() {
652-
s.mockSearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), testNamespace.String()).DoAndReturn(
653-
func(alias string, namespace string) (string, error) {
654-
if strings.HasPrefix(alias, "AliasFor") {
655-
return strings.TrimPrefix(alias, "AliasFor"), nil
656-
}
657-
return "", serviceerror.NewInvalidArgument("mapper error")
658-
}).AnyTimes()
652+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace).
653+
Return(&searchattribute.TestMapper{}, nil).AnyTimes()
659654

660-
s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper
655+
s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider
661656

662657
query := `WorkflowId = 'wid'`
663658
qry, srt, err := s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query)
@@ -707,16 +702,14 @@ func (s *ESVisibilitySuite) Test_convertQuery_Mapper() {
707702
s.Error(err)
708703
s.ErrorAs(err, &invalidArgumentErr)
709704
s.EqualError(err, "invalid query: unable to convert 'order by' column name: invalid search attribute: AliasForUnknownField")
710-
s.visibilityStore.searchAttributesMapper = nil
705+
s.visibilityStore.searchAttributesMapperProvider = nil
711706
}
712707

713708
func (s *ESVisibilitySuite) Test_convertQuery_Mapper_Error() {
714-
s.mockSearchAttributesMapper.EXPECT().GetFieldName(gomock.Any(), testNamespace.String()).DoAndReturn(
715-
func(fieldName string, namespace string) (string, error) {
716-
return "", serviceerror.NewInvalidArgument("mapper error")
717-
}).AnyTimes()
709+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace).
710+
Return(&searchattribute.TestMapper{}, nil).AnyTimes()
718711

719-
s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper
712+
s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider
720713

721714
query := `WorkflowId = 'wid'`
722715
qry, srt, err := s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query)
@@ -743,7 +736,7 @@ func (s *ESVisibilitySuite) Test_convertQuery_Mapper_Error() {
743736
s.ErrorAs(err, &invalidArgumentErr)
744737
s.EqualError(err, "mapper error")
745738

746-
s.visibilityStore.searchAttributesMapper = nil
739+
s.visibilityStore.searchAttributesMapperProvider = nil
747740
}
748741

749742
func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
@@ -966,37 +959,27 @@ func (s *ESVisibilitySuite) TestParseESDoc_SearchAttributes_WithMapper() {
966959
"CustomIntField": [111,222],
967960
"CustomBoolField": true,
968961
"UnknownField": "random"}`)
969-
s.visibilityStore.searchAttributesMapper = s.mockSearchAttributesMapper
962+
s.visibilityStore.searchAttributesMapperProvider = s.mockSearchAttributesMapperProvider
970963

971-
s.mockSearchAttributesMapper.EXPECT().GetAlias(gomock.Any(), testNamespace.String()).DoAndReturn(
972-
func(fieldName string, namespace string) (string, error) {
973-
return "AliasOf" + fieldName, nil
974-
}).Times(6)
964+
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(testNamespace).
965+
Return(&searchattribute.TestMapper{}, nil).AnyTimes()
975966

976967
info, err := s.visibilityStore.parseESDoc("", docSource, searchattribute.TestNameTypeMap, testNamespace)
977968
s.NoError(err)
978969
s.NotNil(info)
979970

980971
s.Len(info.SearchAttributes.GetIndexedFields(), 7)
981972
s.Contains(info.SearchAttributes.GetIndexedFields(), "TemporalChangeVersion")
982-
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomKeywordField")
983-
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomTextField")
984-
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomDatetimeField")
985-
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomDoubleField")
986-
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomBoolField")
987-
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasOfCustomIntField")
973+
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomKeywordField")
974+
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomTextField")
975+
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomDatetimeField")
976+
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomDoubleField")
977+
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomBoolField")
978+
s.Contains(info.SearchAttributes.GetIndexedFields(), "AliasForCustomIntField")
988979
s.NotContains(info.SearchAttributes.GetIndexedFields(), "UnknownField")
989980
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED, info.Status)
990981

991-
s.mockSearchAttributesMapper.EXPECT().GetAlias(gomock.Any(), testNamespace.String()).DoAndReturn(
992-
func(fieldName string, namespace string) (string, error) {
993-
return "", serviceerror.NewUnavailable("error")
994-
})
995-
info, err = s.visibilityStore.parseESDoc("", docSource, searchattribute.TestNameTypeMap, testNamespace)
996-
s.Error(err)
997-
s.Nil(info)
998-
999-
s.visibilityStore.searchAttributesMapper = nil
982+
s.visibilityStore.searchAttributesMapperProvider = nil
1000983
}
1001984

1002985
func (s *ESVisibilitySuite) TestListWorkflowExecutions() {

common/resource/fx.go

+9
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ var Module = fx.Options(
9797
fx.Provide(HostNameProvider),
9898
fx.Provide(TimeSourceProvider),
9999
cluster.MetadataLifetimeHooksModule,
100+
fx.Provide(SearchAttributeMapperProviderProvider),
100101
fx.Provide(SearchAttributeProviderProvider),
101102
fx.Provide(SearchAttributeManagerProvider),
102103
fx.Provide(NamespaceRegistryProvider),
@@ -162,6 +163,14 @@ func TimeSourceProvider() clock.TimeSource {
162163
return clock.NewRealTimeSource()
163164
}
164165

166+
func SearchAttributeMapperProviderProvider(
167+
saMapper searchattribute.Mapper,
168+
) searchattribute.MapperProvider {
169+
return searchattribute.NewMapperProvider(
170+
saMapper,
171+
)
172+
}
173+
165174
func SearchAttributeProviderProvider(
166175
timeSource clock.TimeSource,
167176
cmMgr persistence.ClusterMetadataManager,

common/resourcetest/resourceTest.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ import (
6363
type (
6464
// Test is the test implementation used for testing
6565
Test struct {
66-
MetricsScope tally.Scope
67-
ClusterMetadata *cluster.MockMetadata
68-
SearchAttributesProvider *searchattribute.MockProvider
69-
SearchAttributesManager *searchattribute.MockManager
70-
SearchAttributesMapper *searchattribute.MockMapper
66+
MetricsScope tally.Scope
67+
ClusterMetadata *cluster.MockMetadata
68+
SearchAttributesProvider *searchattribute.MockProvider
69+
SearchAttributesManager *searchattribute.MockManager
70+
SearchAttributesMapperProvider *searchattribute.MockMapperProvider
7171

7272
// other common resources
7373

@@ -174,11 +174,11 @@ func NewTest(
174174
)
175175

176176
return &Test{
177-
MetricsScope: scope,
178-
ClusterMetadata: cluster.NewMockMetadata(controller),
179-
SearchAttributesProvider: searchattribute.NewMockProvider(controller),
180-
SearchAttributesManager: searchattribute.NewMockManager(controller),
181-
SearchAttributesMapper: searchattribute.NewMockMapper(controller),
177+
MetricsScope: scope,
178+
ClusterMetadata: cluster.NewMockMetadata(controller),
179+
SearchAttributesProvider: searchattribute.NewMockProvider(controller),
180+
SearchAttributesManager: searchattribute.NewMockManager(controller),
181+
SearchAttributesMapperProvider: searchattribute.NewMockMapperProvider(controller),
182182

183183
// other common resources
184184

@@ -439,8 +439,8 @@ func (t *Test) GetSearchAttributesManager() searchattribute.Manager {
439439
return t.SearchAttributesManager
440440
}
441441

442-
func (t *Test) GetSearchAttributesMapper() searchattribute.Mapper {
443-
return t.SearchAttributesMapper
442+
func (t *Test) GetSearchAttributesMapperProvider() searchattribute.MapperProvider {
443+
return t.SearchAttributesMapperProvider
444444
}
445445

446446
func (t *Test) RefreshNamespaceCache() {

0 commit comments

Comments
 (0)