Skip to content

Commit 4e8601a

Browse files
Create an FX Module for Archival (#3726)
Add an archival.Module to be used in both tests and prod
1 parent 7d66087 commit 4e8601a

File tree

10 files changed

+85
-6
lines changed

10 files changed

+85
-6
lines changed

common/dynamicconfig/constants.go

+2
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,8 @@ const (
522522
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
523523
// ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning
524524
ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning"
525+
// ArchivalBackendMaxRPS is the maximum rate of requests per second to the archival backend
526+
ArchivalBackendMaxRPS = "history.archivalBackendMaxRPS"
525527

526528
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
527529
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"

service/history/api/describeworkflow/api.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func Invoke(
195195
}
196196
}
197197

198-
relocatableAttributes, err := workflow.NewRelocatableAttributesFetcher(persistenceVisibilityMgr).Fetch(ctx, mutableState)
198+
relocatableAttributes, err := workflow.RelocatableAttributesFetcherProvider(persistenceVisibilityMgr).Fetch(ctx, mutableState)
199199
if err != nil {
200200
shard.GetLogger().Error(
201201
"Failed to fetch relocatable attributes",

service/history/archival/archiver_test.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/golang/mock/gomock"
3333
"github.com/stretchr/testify/assert"
3434
"github.com/stretchr/testify/require"
35+
"go.uber.org/fx"
3536
"go.uber.org/multierr"
3637

3738
carchiver "go.temporal.io/server/common/archiver"
@@ -42,6 +43,7 @@ import (
4243
"go.temporal.io/server/common/quotas"
4344
"go.temporal.io/server/common/sdk"
4445
"go.temporal.io/server/common/testing/mocksdk"
46+
"go.temporal.io/server/service/history/configs"
4547
)
4648

4749
func TestArchiver(t *testing.T) {
@@ -280,7 +282,38 @@ func TestArchiver(t *testing.T) {
280282
rateLimiter := quotas.NewMockRateLimiter(controller)
281283
rateLimiter.EXPECT().WaitN(gomock.Any(), 2).Return(c.RateLimiterWaitErr)
282284

283-
archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
285+
// we need this channel to get the Archiver which is created asynchronously
286+
archivers := make(chan Archiver, 1)
287+
// we make an app here so that we can test that the Module is working as intended
288+
app := fx.New(
289+
fx.Supply(fx.Annotate(archiverProvider, fx.As(new(provider.ArchiverProvider)))),
290+
fx.Supply(fx.Annotate(logRecorder, fx.As(new(log.Logger)))),
291+
fx.Supply(fx.Annotate(metricsHandler, fx.As(new(metrics.Handler)))),
292+
fx.Supply(&configs.Config{
293+
ArchivalBackendMaxRPS: func() float64 {
294+
return 42.0
295+
},
296+
}),
297+
Module,
298+
fx.Decorate(func(rl quotas.RateLimiter) quotas.RateLimiter {
299+
// we need to decorate the rate limiter so that we can use the mock
300+
// we also verify that the rate being used is equal to the one in the config
301+
assert.Equal(t, 42.0, rl.Rate())
302+
return rateLimiter
303+
}),
304+
fx.Invoke(func(a Archiver) {
305+
// after all parameters are provided, we get the Archiver and put it in the channel
306+
// so that we can use it in the test
307+
archivers <- a
308+
}),
309+
)
310+
require.NoError(t, app.Err())
311+
// we need to start the app for fx.Invoke to be called, so that we can get the Archiver
312+
require.NoError(t, app.Start(ctx))
313+
defer func() {
314+
require.NoError(t, app.Stop(ctx))
315+
}()
316+
archiver := <-archivers
284317
_, err = archiver.Archive(ctx, &Request{
285318
HistoryURI: historyURI,
286319
VisibilityURI: visibilityURI,

service/history/archival/fx.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package archival
26+
27+
import (
28+
"go.uber.org/fx"
29+
30+
"go.temporal.io/server/common/quotas"
31+
"go.temporal.io/server/service/history/configs"
32+
)
33+
34+
var Module = fx.Options(
35+
fx.Provide(NewArchiver),
36+
fx.Provide(func(config *configs.Config) quotas.RateLimiter {
37+
return quotas.NewDefaultOutgoingRateLimiter(quotas.RateFn(config.ArchivalBackendMaxRPS))
38+
}),
39+
)

service/history/archival_queue_task_executor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
496496
a,
497497
shardContext,
498498
workflowCache,
499-
workflow.NewRelocatableAttributesFetcher(visibilityManager),
499+
workflow.RelocatableAttributesFetcherProvider(visibilityManager),
500500
p.MetricsHandler,
501501
logger,
502502
)

service/history/configs/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ type Config struct {
295295
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
296296
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
297297
ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn
298+
ArchivalBackendMaxRPS dynamicconfig.FloatPropertyFn
298299
}
299300

300301
const (
@@ -525,6 +526,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
525526
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
526527
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
527528
ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100),
529+
ArchivalBackendMaxRPS: dc.GetFloat64Property(dynamicconfig.ArchivalBackendMaxRPS, 10000.0),
528530
}
529531

530532
return cfg

service/history/fx.go

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"go.temporal.io/server/common/searchattribute"
5555
"go.temporal.io/server/service"
5656
"go.temporal.io/server/service/history/api"
57+
"go.temporal.io/server/service/history/archival"
5758
"go.temporal.io/server/service/history/configs"
5859
"go.temporal.io/server/service/history/consts"
5960
"go.temporal.io/server/service/history/events"
@@ -68,6 +69,7 @@ var Module = fx.Options(
6869
workflow.Module,
6970
shard.Module,
7071
cache.Module,
72+
archival.Module,
7173
fx.Provide(dynamicconfig.NewCollection),
7274
fx.Provide(ConfigProvider), // might be worth just using provider for configs.Config directly
7375
fx.Provide(RetryableInterceptorProvider),

service/history/workflow/fx.go

+1
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ import (
3030

3131
var Module = fx.Options(
3232
fx.Populate(&taskGeneratorProvider),
33+
fx.Provide(RelocatableAttributesFetcherProvider),
3334
)

service/history/workflow/relocatable_attributes_fetcher.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type RelocatableAttributesFetcher interface {
4141
) (*RelocatableAttributes, error)
4242
}
4343

44-
// NewRelocatableAttributesFetcher creates a new instance of a RelocatableAttributesFetcher.
44+
// RelocatableAttributesFetcherProvider provides a new instance of a RelocatableAttributesFetcher.
4545
// The manager.VisibilityManager parameter is used to fetch the relocatable attributes from the persistence backend iff
4646
// we already moved them there out from the mutable state.
4747
// The visibility manager is not used if the relocatable attributes are still in the mutable state.
@@ -52,7 +52,7 @@ type RelocatableAttributesFetcher interface {
5252
// Currently, there is no cache, but you may provide a manager.VisibilityManager that supports caching to this function
5353
// safely.
5454
// TODO: Add a cache around the visibility manager for the relocatable attributes.
55-
func NewRelocatableAttributesFetcher(
55+
func RelocatableAttributesFetcherProvider(
5656
visibilityManager manager.VisibilityManager,
5757
) RelocatableAttributesFetcher {
5858
return &relocatableAttributesFetcher{

service/history/workflow/relocatable_attributes_fetcher_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
126126
}
127127
ctx := context.Background()
128128

129-
fetcher := NewRelocatableAttributesFetcher(visibilityManager)
129+
fetcher := RelocatableAttributesFetcherProvider(visibilityManager)
130130
info, err := fetcher.Fetch(ctx, mutableState)
131131

132132
if c.ExpectedErr != nil {

0 commit comments

Comments
 (0)