Skip to content

Commit 0331549

Browse files
Add an archival task executor (#3663)
Add an archival queue processor
1 parent 2abbea9 commit 0331549

9 files changed

+896
-68
lines changed

common/dynamicconfig/constants.go

-2
Original file line numberDiff line numberDiff line change
@@ -531,8 +531,6 @@ const (
531531
ArchivalProcessorMaxPollRPS = "history.archivalProcessorMaxPollRPS"
532532
// ArchivalProcessorMaxPollHostRPS is max poll rate per second for all archivalQueueProcessor on a host
533533
ArchivalProcessorMaxPollHostRPS = "history.archivalProcessorMaxPollHostRPS"
534-
// ArchivalTaskMaxRetryCount is max times of retry for archivalQueueProcessor
535-
ArchivalTaskMaxRetryCount = "history.archivalTaskMaxRetryCount"
536534
// ArchivalProcessorSchedulerWorkerCount is the number of workers in the host level task scheduler for
537535
// archivalQueueProcessor
538536
ArchivalProcessorSchedulerWorkerCount = "history.archivalProcessorSchedulerWorkerCount"

common/metrics/metric_defs.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -1369,14 +1369,18 @@ var (
13691369
VersionCheckLatency = NewTimerDef("version_check_latency")
13701370

13711371
// History
1372-
CacheRequests = NewCounterDef("cache_requests")
1373-
CacheFailures = NewCounterDef("cache_errors")
1374-
CacheLatency = NewTimerDef("cache_latency")
1375-
CacheMissCounter = NewCounterDef("cache_miss")
1376-
HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency")
1377-
HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency")
1378-
HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge")
1379-
HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count")
1372+
CacheRequests = NewCounterDef("cache_requests")
1373+
CacheFailures = NewCounterDef("cache_errors")
1374+
CacheLatency = NewTimerDef("cache_latency")
1375+
CacheMissCounter = NewCounterDef("cache_miss")
1376+
HistoryEventNotificationQueueingLatency = NewTimerDef("history_event_notification_queueing_latency")
1377+
HistoryEventNotificationFanoutLatency = NewTimerDef("history_event_notification_fanout_latency")
1378+
HistoryEventNotificationInFlightMessageGauge = NewGaugeDef("history_event_notification_inflight_message_gauge")
1379+
HistoryEventNotificationFailDeliveryCount = NewCounterDef("history_event_notification_fail_delivery_count")
1380+
// ArchivalTaskInvalidURI is emitted by the archival queue task executor when the history or visibility URI for an
1381+
// archival task is not a valid URI.
1382+
// We may emit this metric several times for a single task if the task is retried.
1383+
ArchivalTaskInvalidURI = NewCounterDef("archival_task_invalid_uri")
13801384
ArchiverClientSendSignalCount = NewCounterDef("archiver_client_sent_signal")
13811385
ArchiverClientSendSignalFailureCount = NewCounterDef("archiver_client_send_signal_error")
13821386
ArchiverClientHistoryRequestCount = NewCounterDef("archiver_client_history_request")

service/history/archival/archiver.go

+17-27
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import (
4444
"go.temporal.io/server/common/log"
4545
"go.temporal.io/server/common/log/tag"
4646
"go.temporal.io/server/common/metrics"
47-
"go.temporal.io/server/common/primitives/timestamp"
4847
"go.temporal.io/server/common/quotas"
4948
"go.temporal.io/server/common/searchattribute"
5049
)
@@ -61,18 +60,20 @@ type (
6160
BranchToken []byte
6261
NextEventID int64
6362
CloseFailoverVersion int64
64-
HistoryURI string
63+
// HistoryURI is the URI of the history archival backend.
64+
HistoryURI carchiver.URI
6565

6666
// visibility archival
6767
WorkflowTypeName string
68-
StartTime time.Time
69-
ExecutionTime time.Time
70-
CloseTime time.Time
68+
StartTime *time.Time
69+
ExecutionTime *time.Time
70+
CloseTime *time.Time
7171
Status enumspb.WorkflowExecutionStatus
7272
HistoryLength int64
7373
Memo *commonpb.Memo
7474
SearchAttributes *commonpb.SearchAttributes
75-
VisibilityURI string
75+
// VisibilityURI is the URI of the visibility archival backend.
76+
VisibilityURI carchiver.URI
7677

7778
// archival targets: history and/or visibility
7879
Targets []Target
@@ -148,7 +149,6 @@ func (a *archiver) Archive(ctx context.Context, request *Request) (res *Response
148149
Message: fmt.Sprintf("archival rate limited: %s", err.Error()),
149150
}
150151
}
151-
152152
var wg sync.WaitGroup
153153
errs := make([]error, len(request.Targets))
154154
for i, target := range request.Targets {
@@ -178,21 +178,16 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
178178
logger,
179179
tag.ArchivalRequestBranchToken(request.BranchToken),
180180
tag.ArchivalRequestCloseFailoverVersion(request.CloseFailoverVersion),
181-
tag.ArchivalURI(request.HistoryURI),
181+
tag.ArchivalURI(request.HistoryURI.String()),
182182
)
183183
defer a.recordArchiveTargetResult(logger, time.Now(), TargetHistory, &err)
184184

185-
URI, err := carchiver.NewURI(request.HistoryURI)
186-
if err != nil {
187-
return err
188-
}
189-
190-
historyArchiver, err := a.archiverProvider.GetHistoryArchiver(URI.Scheme(), request.CallerService)
185+
historyArchiver, err := a.archiverProvider.GetHistoryArchiver(request.HistoryURI.Scheme(), request.CallerService)
191186
if err != nil {
192187
return err
193188
}
194189

195-
return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{
190+
return historyArchiver.Archive(ctx, request.HistoryURI, &carchiver.ArchiveHistoryRequest{
196191
ShardID: request.ShardID,
197192
NamespaceID: request.NamespaceID,
198193
Namespace: request.Namespace,
@@ -207,16 +202,11 @@ func (a *archiver) archiveHistory(ctx context.Context, request *Request, logger
207202
func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logger log.Logger) (err error) {
208203
logger = log.With(
209204
logger,
210-
tag.ArchivalURI(request.VisibilityURI),
205+
tag.ArchivalURI(request.VisibilityURI.String()),
211206
)
212207
defer a.recordArchiveTargetResult(logger, time.Now(), TargetVisibility, &err)
213208

214-
uri, err := carchiver.NewURI(request.VisibilityURI)
215-
if err != nil {
216-
return err
217-
}
218-
219-
visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(uri.Scheme(), request.CallerService)
209+
visibilityArchiver, err := a.archiverProvider.GetVisibilityArchiver(request.VisibilityURI.Scheme(), request.CallerService)
220210
if err != nil {
221211
return err
222212
}
@@ -226,20 +216,20 @@ func (a *archiver) archiveVisibility(ctx context.Context, request *Request, logg
226216
if err != nil {
227217
return err
228218
}
229-
return visibilityArchiver.Archive(ctx, uri, &archiverspb.VisibilityRecord{
219+
return visibilityArchiver.Archive(ctx, request.VisibilityURI, &archiverspb.VisibilityRecord{
230220
NamespaceId: request.NamespaceID,
231221
Namespace: request.Namespace,
232222
WorkflowId: request.WorkflowID,
233223
RunId: request.RunID,
234224
WorkflowTypeName: request.WorkflowTypeName,
235-
StartTime: timestamp.TimePtr(request.StartTime),
236-
ExecutionTime: timestamp.TimePtr(request.ExecutionTime),
237-
CloseTime: timestamp.TimePtr(request.CloseTime),
225+
StartTime: request.StartTime,
226+
ExecutionTime: request.ExecutionTime,
227+
CloseTime: request.CloseTime,
238228
Status: request.Status,
239229
HistoryLength: request.HistoryLength,
240230
Memo: request.Memo,
241231
SearchAttributes: searchAttributes,
242-
HistoryArchivalUri: request.HistoryURI,
232+
HistoryArchivalUri: request.HistoryURI.String(),
243233
})
244234
}
245235

service/history/archival/archiver_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ func TestArchiver(t *testing.T) {
282282

283283
archiver := NewArchiver(archiverProvider, logRecorder, metricsHandler, rateLimiter)
284284
_, err = archiver.Archive(ctx, &Request{
285-
HistoryURI: historyURI.String(),
286-
VisibilityURI: visibilityURI.String(),
285+
HistoryURI: historyURI,
286+
VisibilityURI: visibilityURI,
287287
Targets: c.Targets,
288288
})
289289

0 commit comments

Comments
 (0)