Skip to content

Commit 285bf33

Browse files
Start the archival queue iff history or visibility archival is enabled in the static config (#3827)
Enable archival queue factory iff it is enabled in the static config
1 parent 554b32d commit 285bf33

5 files changed

+245
-6
lines changed

common/archiver/archivalMetadata.go

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type (
5252
ReadEnabled() bool
5353
GetNamespaceDefaultState() enumspb.ArchivalState
5454
GetNamespaceDefaultURI() string
55+
StaticClusterState() ArchivalState
5556
}
5657

5758
archivalMetadata struct {
@@ -71,6 +72,10 @@ type (
7172
ArchivalState int
7273
)
7374

75+
func (a *archivalConfig) StaticClusterState() ArchivalState {
76+
return a.staticClusterState
77+
}
78+
7479
const (
7580
// ArchivalDisabled means this cluster is not configured to handle archival
7681
ArchivalDisabled ArchivalState = iota

common/archiver/archivalMetadata_mock.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/historyEngineFactory.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ type (
5555
NewCacheFn wcache.NewCacheFn
5656
ArchivalClient archiver.Client
5757
EventSerializer serialization.Serializer
58-
QueueFactories []QueueFactory `group:"queueFactory"`
58+
QueueFactories []QueueFactory
5959
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
6060
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
6161
TracerProvider trace.TracerProvider

service/history/queueFactoryBase.go

+33-5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"go.uber.org/fx"
3232

3333
"go.temporal.io/server/common"
34+
"go.temporal.io/server/common/archiver"
3435
"go.temporal.io/server/common/clock"
3536
"go.temporal.io/server/common/cluster"
3637
"go.temporal.io/server/common/dynamicconfig"
@@ -87,33 +88,60 @@ type (
8788
fx.In
8889

8990
Lifecycle fx.Lifecycle
90-
Factories []QueueFactory `group:"queueFactory"`
91+
Factories []QueueFactory
9192
}
9293
)
9394

9495
var QueueModule = fx.Options(
9596
fx.Provide(QueueSchedulerRateLimiterProvider),
9697
fx.Provide(
9798
fx.Annotated{
98-
Group: QueueFactoryFxGroup,
99+
Name: "transferQueueFactory",
99100
Target: NewTransferQueueFactory,
100101
},
101102
fx.Annotated{
102-
Group: QueueFactoryFxGroup,
103+
Name: "timerQueueFactory",
103104
Target: NewTimerQueueFactory,
104105
},
105106
fx.Annotated{
106-
Group: QueueFactoryFxGroup,
107+
Name: "visibilityQueueFactory",
107108
Target: NewVisibilityQueueFactory,
108109
},
109110
fx.Annotated{
110-
Group: QueueFactoryFxGroup,
111+
Name: "archivalQueueFactory",
111112
Target: NewArchivalQueueFactory,
112113
},
114+
getQueueFactories,
113115
),
114116
fx.Invoke(QueueFactoryLifetimeHooks),
115117
)
116118

119+
type queueFactorySet struct {
120+
fx.In
121+
122+
TransferQueueFactory QueueFactory `name:"transferQueueFactory"`
123+
TimerQueueFactory QueueFactory `name:"timerQueueFactory"`
124+
VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"`
125+
ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"`
126+
}
127+
128+
// getQueueFactories returns factories for all the enabled queue types.
129+
// The archival queue factory is only returned when archival is enabled in the static config.
130+
func getQueueFactories(
131+
queueFactorySet queueFactorySet,
132+
archivalMetadata archiver.ArchivalMetadata,
133+
) []QueueFactory {
134+
factories := []QueueFactory{
135+
queueFactorySet.TransferQueueFactory,
136+
queueFactorySet.TimerQueueFactory,
137+
queueFactorySet.VisibilityQueueFactory,
138+
}
139+
if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled {
140+
factories = append(factories, queueFactorySet.ArchivalQueueFactory)
141+
}
142+
return factories
143+
}
144+
117145
func QueueSchedulerRateLimiterProvider(
118146
config *configs.Config,
119147
) queues.SchedulerRateLimiter {
+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package history
26+
27+
import (
28+
"testing"
29+
30+
"github.com/golang/mock/gomock"
31+
"github.com/stretchr/testify/require"
32+
"go.uber.org/fx"
33+
34+
"go.temporal.io/server/api/historyservice/v1"
35+
"go.temporal.io/server/client"
36+
carchiver "go.temporal.io/server/common/archiver"
37+
"go.temporal.io/server/common/clock"
38+
"go.temporal.io/server/common/cluster"
39+
"go.temporal.io/server/common/dynamicconfig"
40+
"go.temporal.io/server/common/log"
41+
"go.temporal.io/server/common/metrics"
42+
"go.temporal.io/server/common/namespace"
43+
"go.temporal.io/server/common/persistence/visibility/manager"
44+
"go.temporal.io/server/common/resource"
45+
"go.temporal.io/server/common/sdk"
46+
"go.temporal.io/server/service/history/archival"
47+
"go.temporal.io/server/service/history/configs"
48+
"go.temporal.io/server/service/history/workflow"
49+
"go.temporal.io/server/service/worker/archiver"
50+
)
51+
52+
// TestQueueModule_ArchivalQueueCreated tests that the archival queue is created if and only if the static config for
53+
// either history or visibility archival is enabled.
54+
func TestQueueModule_ArchivalQueue(t *testing.T) {
55+
for _, c := range []moduleTestCase{
56+
{
57+
Name: "Archival completely disabled",
58+
HistoryState: carchiver.ArchivalDisabled,
59+
VisibilityState: carchiver.ArchivalDisabled,
60+
ExpectArchivalQueue: false,
61+
},
62+
{
63+
Name: "History archival enabled",
64+
HistoryState: carchiver.ArchivalEnabled,
65+
VisibilityState: carchiver.ArchivalDisabled,
66+
ExpectArchivalQueue: true,
67+
},
68+
{
69+
Name: "Visibility archival enabled",
70+
HistoryState: carchiver.ArchivalDisabled,
71+
VisibilityState: carchiver.ArchivalEnabled,
72+
ExpectArchivalQueue: true,
73+
},
74+
{
75+
Name: "Both history and visibility archival enabled",
76+
HistoryState: carchiver.ArchivalEnabled,
77+
VisibilityState: carchiver.ArchivalEnabled,
78+
ExpectArchivalQueue: true,
79+
},
80+
} {
81+
c := c
82+
t.Run(c.Name, c.Run)
83+
}
84+
}
85+
86+
// moduleTestCase is a test case for the QueueModule.
87+
type moduleTestCase struct {
88+
Name string
89+
HistoryState carchiver.ArchivalState
90+
VisibilityState carchiver.ArchivalState
91+
ExpectArchivalQueue bool
92+
}
93+
94+
// Run runs the test case.
95+
func (c *moduleTestCase) Run(t *testing.T) {
96+
t.Parallel()
97+
controller := gomock.NewController(t)
98+
dependencies := getModuleDependencies(controller, c)
99+
var factories []QueueFactory
100+
101+
app := fx.New(
102+
dependencies,
103+
QueueModule,
104+
fx.Invoke(func(params QueueFactoriesLifetimeHookParams) {
105+
factories = params.Factories
106+
}),
107+
)
108+
109+
require.NoError(t, app.Err())
110+
require.NotNil(t, factories)
111+
var (
112+
txq QueueFactory
113+
tiq QueueFactory
114+
viq QueueFactory
115+
aq QueueFactory
116+
)
117+
for _, f := range factories {
118+
switch f.(type) {
119+
case *transferQueueFactory:
120+
require.Nil(t, txq)
121+
txq = f
122+
case *timerQueueFactory:
123+
require.Nil(t, tiq)
124+
tiq = f
125+
case *visibilityQueueFactory:
126+
require.Nil(t, viq)
127+
viq = f
128+
case *archivalQueueFactory:
129+
require.Nil(t, aq)
130+
aq = f
131+
}
132+
}
133+
require.NotNil(t, txq)
134+
require.NotNil(t, tiq)
135+
require.NotNil(t, viq)
136+
if c.ExpectArchivalQueue {
137+
require.NotNil(t, aq)
138+
} else {
139+
require.Nil(t, aq)
140+
}
141+
}
142+
143+
// getModuleDependencies returns an fx.Option that provides all the dependencies needed for the queue module.
144+
func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx.Option {
145+
cfg := configs.NewConfig(
146+
dynamicconfig.NewNoopCollection(),
147+
1,
148+
false,
149+
"",
150+
)
151+
archivalMetadata := getArchivalMetadata(controller, c)
152+
clusterMetadata := cluster.NewMockMetadata(controller)
153+
clusterMetadata.EXPECT().GetCurrentClusterName().Return("module-test-cluster-name").AnyTimes()
154+
return fx.Supply(
155+
compileTimeDependencies{},
156+
cfg,
157+
fx.Annotate(archivalMetadata, fx.As(new(carchiver.ArchivalMetadata))),
158+
fx.Annotate(metrics.NoopMetricsHandler, fx.As(new(metrics.Handler))),
159+
fx.Annotate(clusterMetadata, fx.As(new(cluster.Metadata))),
160+
)
161+
}
162+
163+
// compileTimeDependencies is a struct that provides nil implementations of all the dependencies needed for the queue
164+
// module that are not required for the test at runtime.
165+
type compileTimeDependencies struct {
166+
fx.Out
167+
168+
namespace.Registry
169+
clock.TimeSource
170+
log.SnTaggedLogger
171+
client.Bean
172+
archiver.Client
173+
sdk.ClientFactory
174+
resource.MatchingClient
175+
historyservice.HistoryServiceClient
176+
manager.VisibilityManager
177+
archival.Archiver
178+
workflow.RelocatableAttributesFetcher
179+
}
180+
181+
// getArchivalMetadata returns a mock ArchivalMetadata that contains the static archival config specified in the given
182+
// test case.
183+
func getArchivalMetadata(controller *gomock.Controller, c *moduleTestCase) *carchiver.MockArchivalMetadata {
184+
archivalMetadata := carchiver.NewMockArchivalMetadata(controller)
185+
historyConfig := carchiver.NewMockArchivalConfig(controller)
186+
visibilityConfig := carchiver.NewMockArchivalConfig(controller)
187+
historyConfig.EXPECT().StaticClusterState().Return(c.HistoryState).AnyTimes()
188+
visibilityConfig.EXPECT().StaticClusterState().Return(c.VisibilityState).AnyTimes()
189+
archivalMetadata.EXPECT().GetHistoryConfig().Return(historyConfig).AnyTimes()
190+
archivalMetadata.EXPECT().GetVisibilityConfig().Return(visibilityConfig).AnyTimes()
191+
return archivalMetadata
192+
}

0 commit comments

Comments
 (0)