Skip to content

Commit 554b32d

Browse files
Produce archival tasks conditionally (#3823)
1 parent b9bba94 commit 554b32d

11 files changed

+361
-34
lines changed

common/archiver/archivalMetadata.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func NewArchivalConfig(
146146
}
147147
}
148148

149-
// NewDisabledArchvialConfig returns a disabled ArchivalConfig
149+
// NewDisabledArchvialConfig returns an ArchivalConfig where archival is disabled for both the cluster and the namespace
150150
func NewDisabledArchvialConfig() ArchivalConfig {
151151
return &archivalConfig{
152152
staticClusterState: ArchivalDisabled,
@@ -157,6 +157,17 @@ func NewDisabledArchvialConfig() ArchivalConfig {
157157
}
158158
}
159159

160+
// NewEnabledArchivalConfig returns an ArchivalConfig where archival is enabled for both the cluster and the namespace
161+
func NewEnabledArchivalConfig() ArchivalConfig {
162+
return &archivalConfig{
163+
staticClusterState: ArchivalEnabled,
164+
dynamicClusterState: dynamicconfig.GetStringPropertyFn("enabled"),
165+
enableRead: dynamicconfig.GetBoolPropertyFn(true),
166+
namespaceDefaultState: enumspb.ARCHIVAL_STATE_ENABLED,
167+
namespaceDefaultURI: "some-uri",
168+
}
169+
}
170+
160171
// ClusterConfiguredForArchival returns true if cluster is configured to handle archival, false otherwise
161172
func (a *archivalConfig) ClusterConfiguredForArchival() bool {
162173
return a.GetClusterState() == ArchivalEnabled

common/archiver/metadata_mock.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package archiver
26+
27+
import (
28+
"github.com/golang/mock/gomock"
29+
)
30+
31+
// MetadataMock is an implementation of ArchivalMetadata that can be used for testing.
32+
// It can be used as a mock, but it also provides default values, which is something that can't be done with
33+
// *MockArchivalMetadata. This cuts down on the amount of boilerplate code needed to write tests.
34+
type MetadataMock interface {
35+
ArchivalMetadata
36+
// EXPECT returns a MetadataMockRecorder which can be used to set expectations on the mock.
37+
EXPECT() MetadataMockRecorder
38+
// SetHistoryEnabledByDefault sets the default history archival config to be enabled.
39+
SetHistoryEnabledByDefault()
40+
// SetVisibilityEnabledByDefault sets the default visibility archival config to be enabled.
41+
SetVisibilityEnabledByDefault()
42+
}
43+
44+
// NewMetadataMock returns a new MetadataMock which uses the provided controller to create a MockArchivalMetadata
45+
// instance.
46+
func NewMetadataMock(controller *gomock.Controller) MetadataMock {
47+
m := &metadataMock{
48+
MockArchivalMetadata: NewMockArchivalMetadata(controller),
49+
defaultHistoryConfig: NewDisabledArchvialConfig(),
50+
defaultVisibilityConfig: NewDisabledArchvialConfig(),
51+
}
52+
return m
53+
}
54+
55+
// MetadataMockRecorder is a wrapper around a ArchivalMetadata mock recorder.
56+
// It is used to determine whether any calls to EXPECT().GetHistoryConfig() or EXPECT().GetVisibilityConfig() were made.
57+
// A call to EXPECT().GetSomeConfig() causes that default config to no longer be used.
58+
type MetadataMockRecorder interface {
59+
GetHistoryConfig() *gomock.Call
60+
GetVisibilityConfig() *gomock.Call
61+
}
62+
63+
type metadataMock struct {
64+
*MockArchivalMetadata
65+
defaultHistoryConfig ArchivalConfig
66+
defaultVisibilityConfig ArchivalConfig
67+
historyOverwritten bool
68+
visibilityOverwritten bool
69+
}
70+
71+
func (m *metadataMock) SetHistoryEnabledByDefault() {
72+
m.defaultHistoryConfig = NewEnabledArchivalConfig()
73+
}
74+
75+
func (m *metadataMock) SetVisibilityEnabledByDefault() {
76+
m.defaultVisibilityConfig = NewEnabledArchivalConfig()
77+
}
78+
79+
func (m *metadataMock) GetHistoryConfig() ArchivalConfig {
80+
if !m.historyOverwritten {
81+
return m.defaultHistoryConfig
82+
}
83+
return m.MockArchivalMetadata.GetHistoryConfig()
84+
}
85+
86+
func (m *metadataMock) GetVisibilityConfig() ArchivalConfig {
87+
if !m.visibilityOverwritten {
88+
return m.defaultVisibilityConfig
89+
}
90+
return m.MockArchivalMetadata.GetVisibilityConfig()
91+
}
92+
93+
func (m *metadataMock) EXPECT() MetadataMockRecorder {
94+
return metadataMockRecorder{m}
95+
}
96+
97+
type metadataMockRecorder struct {
98+
*metadataMock
99+
}
100+
101+
func (r metadataMockRecorder) GetHistoryConfig() *gomock.Call {
102+
r.metadataMock.historyOverwritten = true
103+
return r.MockArchivalMetadata.EXPECT().GetHistoryConfig()
104+
}
105+
106+
func (r metadataMockRecorder) GetVisibilityConfig() *gomock.Call {
107+
r.metadataMock.visibilityOverwritten = true
108+
return r.MockArchivalMetadata.EXPECT().GetVisibilityConfig()
109+
}

common/archiver/metadata_mock_test.go

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package archiver
26+
27+
import (
28+
"testing"
29+
30+
"github.com/golang/mock/gomock"
31+
"github.com/stretchr/testify/assert"
32+
)
33+
34+
func TestMetadataMock(t *testing.T) {
35+
t.Run("GetHistoryConfig", func(t *testing.T) {
36+
metadata := NewMetadataMock(gomock.NewController(t))
37+
config := metadata.GetHistoryConfig()
38+
39+
assert.False(t, config.ClusterConfiguredForArchival())
40+
})
41+
t.Run("GetVisibilityConfig", func(t *testing.T) {
42+
metadata := NewMetadataMock(gomock.NewController(t))
43+
config := metadata.GetVisibilityConfig()
44+
45+
assert.False(t, config.ClusterConfiguredForArchival())
46+
})
47+
t.Run("GetHistoryConfig_SetHistoryEnabledByDefault", func(t *testing.T) {
48+
metadata := NewMetadataMock(gomock.NewController(t))
49+
metadata.SetHistoryEnabledByDefault()
50+
config := metadata.GetHistoryConfig()
51+
52+
assert.True(t, config.ClusterConfiguredForArchival())
53+
54+
metadata.EXPECT().GetHistoryConfig().Return(NewDisabledArchvialConfig())
55+
config = metadata.GetHistoryConfig()
56+
57+
assert.False(t, config.ClusterConfiguredForArchival())
58+
})
59+
t.Run("GetVisibilityConfig_SetVisibilityEnabledByDefault", func(t *testing.T) {
60+
metadata := NewMetadataMock(gomock.NewController(t))
61+
metadata.SetVisibilityEnabledByDefault()
62+
config := metadata.GetVisibilityConfig()
63+
64+
assert.True(t, config.ClusterConfiguredForArchival())
65+
66+
metadata.EXPECT().GetVisibilityConfig().Return(NewDisabledArchvialConfig())
67+
config = metadata.GetVisibilityConfig()
68+
69+
assert.False(t, config.ClusterConfiguredForArchival())
70+
})
71+
t.Run("EXPECT_GetHistoryConfig", func(t *testing.T) {
72+
metadata := NewMetadataMock(gomock.NewController(t))
73+
metadata.EXPECT().GetHistoryConfig().Return(NewEnabledArchivalConfig())
74+
config := metadata.GetHistoryConfig()
75+
76+
assert.True(t, config.ClusterConfiguredForArchival())
77+
78+
metadata.EXPECT().GetHistoryConfig().Return(NewDisabledArchvialConfig())
79+
config = metadata.GetHistoryConfig()
80+
81+
assert.False(t, config.ClusterConfiguredForArchival())
82+
})
83+
84+
t.Run("EXPECT_GetVisibilityConfig", func(t *testing.T) {
85+
metadata := NewMetadataMock(gomock.NewController(t))
86+
metadata.EXPECT().GetVisibilityConfig().Return(NewEnabledArchivalConfig())
87+
config := metadata.GetVisibilityConfig()
88+
89+
assert.True(t, config.ClusterConfiguredForArchival())
90+
91+
metadata.EXPECT().GetVisibilityConfig().Return(NewDisabledArchvialConfig())
92+
config = metadata.GetVisibilityConfig()
93+
94+
assert.False(t, config.ClusterConfiguredForArchival())
95+
})
96+
}

common/resourcetest/resourceTest.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type (
7474
TimeSource clock.TimeSource
7575
PayloadSerializer serialization.Serializer
7676
MetricsHandler metrics.Handler
77-
ArchivalMetadata *archiver.MockArchivalMetadata
77+
ArchivalMetadata archiver.MetadataMock
7878
ArchiverProvider *provider.MockArchiverProvider
7979

8080
// membership infos
@@ -184,7 +184,7 @@ func NewTest(
184184
TimeSource: clock.NewRealTimeSource(),
185185
PayloadSerializer: serialization.NewSerializer(),
186186
MetricsHandler: metricsHandler,
187-
ArchivalMetadata: archiver.NewMockArchivalMetadata(controller),
187+
ArchivalMetadata: archiver.NewMetadataMock(controller),
188188
ArchiverProvider: provider.NewMockArchiverProvider(controller),
189189

190190
// membership infos

service/frontend/workflow_handler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type (
106106
mockMetadataMgr *persistence.MockMetadataManager
107107
mockExecutionManager *persistence.MockExecutionManager
108108
mockVisibilityMgr *manager.MockVisibilityManager
109-
mockArchivalMetadata *archiver.MockArchivalMetadata
109+
mockArchivalMetadata archiver.MetadataMock
110110
mockArchiverProvider *provider.MockArchiverProvider
111111
mockHistoryArchiver *archiver.MockHistoryArchiver
112112
mockVisibilityArchiver *archiver.MockVisibilityArchiver

service/history/archival_queue_task_executor.go

+1
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func (e *archivalQueueTaskExecutor) addDeletionTask(
246246
e.shardContext.GetNamespaceRegistry(),
247247
mutableState,
248248
e.shardContext.GetConfig(),
249+
e.shardContext.GetArchivalMetadata(),
249250
)
250251
err = taskGenerator.GenerateDeleteHistoryEventTask(*closeTime, true)
251252
if err != nil {

service/history/transferQueueActiveTaskExecutor_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ import (
5151
"go.temporal.io/server/api/matchingservicemock/v1"
5252
persistencespb "go.temporal.io/server/api/persistence/v1"
5353
workflowspb "go.temporal.io/server/api/workflow/v1"
54-
5554
"go.temporal.io/server/common"
5655
"go.temporal.io/server/common/archiver"
5756
"go.temporal.io/server/common/archiver/provider"
@@ -102,7 +101,7 @@ type (
102101

103102
mockExecutionMgr *persistence.MockExecutionManager
104103
mockArchivalClient *warchiver.MockClient
105-
mockArchivalMetadata *archiver.MockArchivalMetadata
104+
mockArchivalMetadata archiver.MetadataMock
106105
mockArchiverProvider *provider.MockArchiverProvider
107106
mockParentClosePolicyClient *parentclosepolicy.MockClient
108107

@@ -207,6 +206,8 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() {
207206
s.mockClusterMetadata.EXPECT().GetAllClusterInfo().Return(cluster.TestAllClusterInfo).AnyTimes()
208207
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
209208
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(s.namespaceEntry.IsGlobalNamespace(), s.version).Return(s.mockClusterMetadata.GetCurrentClusterName()).AnyTimes()
209+
s.mockArchivalMetadata.SetHistoryEnabledByDefault()
210+
s.mockArchivalMetadata.SetVisibilityEnabledByDefault()
210211

211212
s.workflowCache = wcache.NewCache(s.mockShard)
212213
s.logger = s.mockShard.GetLogger()
@@ -800,7 +801,7 @@ func (s *transferQueueActiveTaskExecutorSuite) TestProcessCloseExecution_CanSkip
800801
dc.GetBoolPropertyFn(true),
801802
"disabled",
802803
"random URI",
803-
))
804+
)).AnyTimes()
804805
s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil)
805806
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false)
806807
}

service/history/transferQueueStandbyTaskExecutor_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ type (
9595

9696
mockExecutionMgr *persistence.MockExecutionManager
9797
mockArchivalClient *warchiver.MockClient
98-
mockArchivalMetadata *archiver.MockArchivalMetadata
98+
mockArchivalMetadata archiver.MetadataMock
9999
mockArchiverProvider *provider.MockArchiverProvider
100100

101101
workflowCache wcache.Cache
@@ -194,6 +194,9 @@ func (s *transferQueueStandbyTaskExecutorSuite) SetupTest() {
194194
s.workflowCache = wcache.NewCache(s.mockShard)
195195
s.logger = s.mockShard.GetLogger()
196196

197+
s.mockArchivalMetadata.SetHistoryEnabledByDefault()
198+
s.mockArchivalMetadata.SetVisibilityEnabledByDefault()
199+
197200
h := &historyEngineImpl{
198201
currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(),
199202
shard: s.mockShard,
@@ -745,7 +748,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution_CanSki
745748
"disabled",
746749
"random URI",
747750
),
748-
)
751+
).AnyTimes()
749752
s.mockArchivalClient.EXPECT().Archive(gomock.Any(), gomock.Any()).Return(nil, nil)
750753
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false)
751754
}

service/history/workflow/task_generator.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"go.temporal.io/api/serviceerror"
3636

3737
enumsspb "go.temporal.io/server/api/enums/v1"
38+
"go.temporal.io/server/common/archiver"
3839
"go.temporal.io/server/common/backoff"
3940
"go.temporal.io/server/common/namespace"
4041
"go.temporal.io/server/common/persistence/versionhistory"
@@ -108,6 +109,7 @@ type (
108109
namespaceRegistry namespace.Registry
109110
mutableState MutableState
110111
config *configs.Config
112+
archivalMetadata archiver.ArchivalMetadata
111113
}
112114
)
113115

@@ -119,11 +121,13 @@ func NewTaskGenerator(
119121
namespaceRegistry namespace.Registry,
120122
mutableState MutableState,
121123
config *configs.Config,
124+
archivalMetadata archiver.ArchivalMetadata,
122125
) *TaskGeneratorImpl {
123126
return &TaskGeneratorImpl{
124127
namespaceRegistry: namespaceRegistry,
125128
mutableState: mutableState,
126129
config: config,
130+
archivalMetadata: archivalMetadata,
127131
}
128132
}
129133

@@ -188,7 +192,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
188192
Version: currentVersion,
189193
},
190194
)
191-
if r.config.DurableArchivalEnabled() {
195+
if r.archivalQueueEnabled() {
192196
retention, err := r.getRetention()
193197
if err != nil {
194198
return err
@@ -636,3 +640,17 @@ func (r *TaskGeneratorImpl) getTargetNamespaceID(
636640

637641
return namespace.ID(r.mutableState.GetExecutionInfo().NamespaceId), nil
638642
}
643+
644+
// archivalQueueEnabled returns true if archival is enabled for either history or visibility, and the archival queue
645+
// itself is also enabled.
646+
// For both history and visibility, we check that archival is enabled for both the cluster and the namespace.
647+
func (r *TaskGeneratorImpl) archivalQueueEnabled() bool {
648+
if !r.config.DurableArchivalEnabled() {
649+
return false
650+
}
651+
namespaceEntry := r.mutableState.GetNamespaceEntry()
652+
return r.archivalMetadata.GetHistoryConfig().ClusterConfiguredForArchival() &&
653+
namespaceEntry.HistoryArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED ||
654+
r.archivalMetadata.GetVisibilityConfig().ClusterConfiguredForArchival() &&
655+
namespaceEntry.VisibilityArchivalState().State == enumspb.ARCHIVAL_STATE_ENABLED
656+
}

service/history/workflow/task_generator_provider.go

+1
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,6 @@ func (p *taskGeneratorProviderImpl) NewTaskGenerator(
5656
shard.GetNamespaceRegistry(),
5757
mutableState,
5858
shard.GetConfig(),
59+
shard.GetArchivalMetadata(),
5960
)
6061
}

0 commit comments

Comments
 (0)