Skip to content

Commit 532c422

Browse files
Add an archival queue factory (#3562)
1 parent 0331549 commit 532c422

File tree

5 files changed

+283
-10
lines changed

5 files changed

+283
-10
lines changed

common/dynamicconfig/constants.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -534,9 +534,6 @@ const (
534534
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
535535
// archivalQueueProcessor
536536
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"
537-
// ArchivalProcessorSchedulerRoundRobinWeights is the priority round robin weights by archival task scheduler for
538-
// all namespaces
539-
ArchivalProcessorSchedulerRoundRobinWeights = "history.archivalProcessorSchedulerRoundRobinWeights"
540537
// ArchivalProcessorMaxPollInterval max poll interval for archivalQueueProcessor
541538
ArchivalProcessorMaxPollInterval = "history.archivalProcessorMaxPollInterval"
542539
// ArchivalProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
@@ -550,6 +547,8 @@ const (
550547
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"
551548
// ArchivalProcessorArchiveDelay is the delay before archivalQueueProcessor starts to process archival tasks
552549
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
550+
// ArchivalProcessorRetryWarningLimit is the number of times an archival task may be retried before we log a warning
551+
ArchivalProcessorRetryWarningLimit = "history.archivalProcessorRetryLimitWarning"
553552

554553
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
555554
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"
+184
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package history
26+
27+
import (
28+
"go.uber.org/fx"
29+
30+
"go.temporal.io/server/common/log"
31+
"go.temporal.io/server/common/log/tag"
32+
"go.temporal.io/server/common/metrics"
33+
ctasks "go.temporal.io/server/common/tasks"
34+
"go.temporal.io/server/service/history/archival"
35+
"go.temporal.io/server/service/history/configs"
36+
"go.temporal.io/server/service/history/queues"
37+
"go.temporal.io/server/service/history/shard"
38+
"go.temporal.io/server/service/history/tasks"
39+
"go.temporal.io/server/service/history/workflow"
40+
wcache "go.temporal.io/server/service/history/workflow/cache"
41+
)
42+
43+
const (
44+
// archivalQueuePersistenceMaxRPSRatio is the hard-coded ratio of archival queue persistence max RPS to the total
45+
// persistence max RPS.
46+
// In this case, the archival queue may not send requests at a rate higher than 15% of the global persistence max
47+
// RPS.
48+
archivalQueuePersistenceMaxRPSRatio = 0.15
49+
)
50+
51+
var (
52+
// ArchivalTaskPriorities is the map of task priority to weight for the archival queue.
53+
// The archival queue only uses the low task priority, so we only define a weight for that priority.
54+
ArchivalTaskPriorities = configs.ConvertWeightsToDynamicConfigValue(map[ctasks.Priority]int{
55+
ctasks.PriorityLow: 10,
56+
})
57+
)
58+
59+
type (
60+
// ArchivalQueueFactoryParams contains the necessary params to create a new archival queue factory.
61+
ArchivalQueueFactoryParams struct {
62+
// fx.In allows fx to construct this object without an explicitly defined constructor.
63+
fx.In
64+
65+
// QueueFactoryBaseParams contains common params for all queue factories.
66+
QueueFactoryBaseParams
67+
// Archiver is the archival client used to archive history events and visibility records.
68+
Archiver archival.Archiver
69+
// RelocatableAttributesFetcher is the client used to fetch the memo and search attributes of a workflow.
70+
RelocatableAttributesFetcher workflow.RelocatableAttributesFetcher
71+
}
72+
73+
// archivalQueueFactory implements QueueFactory for the archival queue.
74+
archivalQueueFactory struct {
75+
QueueFactoryBase
76+
ArchivalQueueFactoryParams
77+
}
78+
)
79+
80+
// NewArchivalQueueFactory creates a new QueueFactory to construct archival queues.
81+
func NewArchivalQueueFactory(
82+
params ArchivalQueueFactoryParams,
83+
) QueueFactory {
84+
hostScheduler := newScheduler(params)
85+
queueFactoryBase := newQueueFactoryBase(params, hostScheduler)
86+
return &archivalQueueFactory{
87+
ArchivalQueueFactoryParams: params,
88+
QueueFactoryBase: queueFactoryBase,
89+
}
90+
}
91+
92+
// newScheduler creates a new task scheduler for tasks on the archival queue.
93+
func newScheduler(params ArchivalQueueFactoryParams) queues.Scheduler {
94+
return queues.NewPriorityScheduler(
95+
queues.PrioritySchedulerOptions{
96+
WorkerCount: params.Config.ArchivalProcessorSchedulerWorkerCount,
97+
EnableRateLimiter: params.Config.TaskSchedulerEnableRateLimiter,
98+
MaxDispatchThrottleDuration: HostSchedulerMaxDispatchThrottleDuration,
99+
Weight: func() map[string]any {
100+
return ArchivalTaskPriorities
101+
},
102+
},
103+
params.SchedulerRateLimiter,
104+
params.TimeSource,
105+
params.Logger,
106+
)
107+
}
108+
109+
// newQueueFactoryBase creates a new QueueFactoryBase for the archival queue, which contains common configurations
110+
// like the task scheduler, task priority assigner, and rate limiters.
111+
func newQueueFactoryBase(params ArchivalQueueFactoryParams, hostScheduler queues.Scheduler) QueueFactoryBase {
112+
return QueueFactoryBase{
113+
HostScheduler: hostScheduler,
114+
HostPriorityAssigner: queues.NewPriorityAssigner(),
115+
HostRateLimiter: NewQueueHostRateLimiter(
116+
params.Config.ArchivalProcessorMaxPollHostRPS,
117+
params.Config.PersistenceMaxQPS,
118+
archivalQueuePersistenceMaxRPSRatio,
119+
),
120+
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
121+
NewHostRateLimiterRateFn(
122+
params.Config.ArchivalProcessorMaxPollHostRPS,
123+
params.Config.PersistenceMaxQPS,
124+
archivalQueuePersistenceMaxRPSRatio,
125+
),
126+
params.Config.QueueMaxReaderCount(),
127+
),
128+
}
129+
}
130+
131+
// CreateQueue creates a new archival queue for the given shard.
132+
func (f *archivalQueueFactory) CreateQueue(
133+
shard shard.Context,
134+
workflowCache wcache.Cache,
135+
) queues.Queue {
136+
executor := f.newArchivalTaskExecutor(shard, workflowCache)
137+
return f.newScheduledQueue(shard, executor)
138+
}
139+
140+
// newArchivalTaskExecutor creates a new archival task executor for the given shard.
141+
func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, workflowCache wcache.Cache) queues.Executor {
142+
return NewArchivalQueueTaskExecutor(
143+
f.Archiver,
144+
shard,
145+
workflowCache,
146+
f.RelocatableAttributesFetcher,
147+
f.MetricsHandler,
148+
f.Logger,
149+
)
150+
}
151+
152+
// newScheduledQueue creates a new scheduled queue for the given shard with archival-specific configurations.
153+
func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor queues.Executor) queues.Queue {
154+
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)
155+
return queues.NewScheduledQueue(
156+
shard,
157+
tasks.CategoryArchival,
158+
f.HostScheduler,
159+
f.HostPriorityAssigner,
160+
executor,
161+
&queues.Options{
162+
ReaderOptions: queues.ReaderOptions{
163+
BatchSize: f.Config.ArchivalTaskBatchSize,
164+
MaxPendingTasksCount: f.Config.QueuePendingTaskMaxCount,
165+
PollBackoffInterval: f.Config.ArchivalProcessorPollBackoffInterval,
166+
},
167+
MonitorOptions: queues.MonitorOptions{
168+
PendingTasksCriticalCount: f.Config.QueuePendingTaskCriticalCount,
169+
ReaderStuckCriticalAttempts: f.Config.QueueReaderStuckCriticalAttempts,
170+
SliceCountCriticalThreshold: f.Config.QueueCriticalSlicesCount,
171+
},
172+
MaxPollRPS: f.Config.ArchivalProcessorMaxPollRPS,
173+
MaxPollInterval: f.Config.ArchivalProcessorMaxPollInterval,
174+
MaxPollIntervalJitterCoefficient: f.Config.ArchivalProcessorMaxPollIntervalJitterCoefficient,
175+
CheckpointInterval: f.Config.ArchivalProcessorUpdateAckInterval,
176+
CheckpointIntervalJitterCoefficient: f.Config.ArchivalProcessorUpdateAckIntervalJitterCoefficient,
177+
MaxReaderCount: f.Config.QueueMaxReaderCount,
178+
TaskMaxRetryCount: f.Config.ArchivalProcessorRetryWarningLimit,
179+
},
180+
f.HostReaderRateLimiter,
181+
logger,
182+
f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)),
183+
)
184+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package history
26+
27+
import (
28+
"testing"
29+
30+
"github.com/golang/mock/gomock"
31+
"github.com/stretchr/testify/assert"
32+
"github.com/stretchr/testify/require"
33+
34+
"go.temporal.io/server/api/persistence/v1"
35+
"go.temporal.io/server/common/log"
36+
"go.temporal.io/server/common/metrics"
37+
"go.temporal.io/server/common/namespace"
38+
"go.temporal.io/server/common/primitives/timestamp"
39+
"go.temporal.io/server/service/history/shard"
40+
"go.temporal.io/server/service/history/tasks"
41+
"go.temporal.io/server/service/history/tests"
42+
)
43+
44+
func TestArchivalQueueFactory(t *testing.T) {
45+
ctrl := gomock.NewController(t)
46+
metricsHandler := metrics.NewMockMetricsHandler(ctrl)
47+
metricsHandler.EXPECT().WithTags(gomock.Any()).Do(func(tags ...metrics.Tag) metrics.MetricsHandler {
48+
require.Len(t, tags, 1)
49+
assert.Equal(t, metrics.OperationTagName, tags[0].Key())
50+
assert.Equal(t, "ArchivalQueueProcessor", tags[0].Value())
51+
return metricsHandler
52+
})
53+
shardContext := shard.NewMockContext(ctrl)
54+
shardContext.EXPECT().GetLogger().Return(log.NewNoopLogger())
55+
shardContext.EXPECT().GetQueueState(tasks.CategoryArchival).Return(&persistence.QueueState{
56+
ReaderStates: nil,
57+
ExclusiveReaderHighWatermark: &persistence.TaskKey{
58+
FireTime: timestamp.TimeNowPtrUtc(),
59+
},
60+
}, true)
61+
shardContext.EXPECT().GetTimeSource().Return(namespace.NewMockClock(ctrl)).AnyTimes()
62+
63+
queueFactory := NewArchivalQueueFactory(ArchivalQueueFactoryParams{
64+
QueueFactoryBaseParams: QueueFactoryBaseParams{
65+
Config: tests.NewDynamicConfig(),
66+
TimeSource: namespace.NewMockClock(ctrl),
67+
MetricsHandler: metricsHandler,
68+
Logger: log.NewNoopLogger(),
69+
},
70+
})
71+
queue := queueFactory.CreateQueue(shardContext, nil)
72+
73+
require.NotNil(t, queue)
74+
assert.Equal(t, tasks.CategoryArchival, queue.Category())
75+
}

service/history/configs/config.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,6 @@ type Config struct {
296296

297297
// ArchivalQueueProcessor settings
298298
ArchivalProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
299-
ArchivalProcessorSchedulerRoundRobinWeights dynamicconfig.MapPropertyFnWithNamespaceFilter
300299
ArchivalProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
301300
ArchivalTaskBatchSize dynamicconfig.IntPropertyFn
302301
ArchivalProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
@@ -306,6 +305,7 @@ type Config struct {
306305
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
307306
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
308307
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
308+
ArchivalProcessorRetryWarningLimit dynamicconfig.IntPropertyFn
309309
}
310310

311311
const (
@@ -534,19 +534,19 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
534534
NamespaceCacheRefreshInterval: dc.GetDurationProperty(dynamicconfig.NamespaceCacheRefreshInterval, 10*time.Second),
535535

536536
// Archival related
537-
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
538-
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
539-
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
540-
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
541-
ArchivalProcessorSchedulerRoundRobinWeights: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.ArchivalProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultActiveTaskPriorityWeight)),
542-
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 1*time.Minute),
537+
ArchivalTaskBatchSize: dc.GetIntProperty(dynamicconfig.ArchivalTaskBatchSize, 100),
538+
ArchivalProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollRPS, 20),
539+
ArchivalProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.ArchivalProcessorMaxPollHostRPS, 0),
540+
ArchivalProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ArchivalProcessorSchedulerWorkerCount, 512),
541+
ArchivalProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorMaxPollInterval, 5*time.Minute),
543542
ArchivalProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
544543
ArchivalProcessorMaxPollIntervalJitterCoefficient, 0.15),
545544
ArchivalProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorUpdateAckInterval, 30*time.Second),
546545
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
547546
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
548547
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
549548
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
549+
ArchivalProcessorRetryWarningLimit: dc.GetIntProperty(dynamicconfig.ArchivalProcessorRetryWarningLimit, 100),
550550
}
551551

552552
return cfg

service/history/queues/priority_assigner_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,18 @@ func (s *priorityAssignerSuite) TestAssign_HighPriorityTaskTypes() {
8383

8484
s.Equal(tasks.PriorityHigh, s.priorityAssigner.Assign(mockExecutable))
8585
}
86+
87+
func (s *priorityAssignerSuite) TestAssign_LowPriorityTaskTypes() {
88+
for _, taskType := range []enumsspb.TaskType{
89+
enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT,
90+
enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION,
91+
enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION,
92+
enumsspb.TASK_TYPE_ARCHIVAL_ARCHIVE_EXECUTION,
93+
enumsspb.TASK_TYPE_UNSPECIFIED,
94+
} {
95+
mockExecutable := NewMockExecutable(s.controller)
96+
mockExecutable.EXPECT().GetType().Return(taskType).Times(1)
97+
98+
s.Equal(tasks.PriorityLow, s.priorityAssigner.Assign(mockExecutable))
99+
}
100+
}

0 commit comments

Comments
 (0)