@@ -36,6 +36,7 @@ import (
36
36
"go.temporal.io/server/common"
37
37
"go.temporal.io/server/common/backoff"
38
38
"go.temporal.io/server/common/clock"
39
+ "go.temporal.io/server/common/cluster"
39
40
"go.temporal.io/server/common/headers"
40
41
"go.temporal.io/server/common/log"
41
42
"go.temporal.io/server/common/log/tag"
@@ -110,6 +111,7 @@ type (
110
111
priorityAssigner PriorityAssigner
111
112
timeSource clock.TimeSource
112
113
namespaceRegistry namespace.Registry
114
+ clusterMetadata cluster.Metadata
113
115
114
116
readerID int32
115
117
loadTime time.Time
@@ -135,6 +137,7 @@ func NewExecutable(
135
137
priorityAssigner PriorityAssigner ,
136
138
timeSource clock.TimeSource ,
137
139
namespaceRegistry namespace.Registry ,
140
+ clusterMetadata cluster.Metadata ,
138
141
logger log.Logger ,
139
142
metricsHandler metrics.Handler ,
140
143
) Executable {
@@ -148,6 +151,7 @@ func NewExecutable(
148
151
priorityAssigner : priorityAssigner ,
149
152
timeSource : timeSource ,
150
153
namespaceRegistry : namespaceRegistry ,
154
+ clusterMetadata : clusterMetadata ,
151
155
readerID : readerID ,
152
156
loadTime : util .MaxTime (timeSource .Now (), task .GetKey ().FireTime ),
153
157
logger : log .NewLazyLogger (
@@ -163,15 +167,29 @@ func NewExecutable(
163
167
return executable
164
168
}
165
169
166
- func (e * executableImpl ) Execute () error {
170
+ func (e * executableImpl ) Execute () ( retErr error ) {
167
171
if e .State () == ctasks .TaskStateCancelled {
168
172
return nil
169
173
}
170
174
171
- ctx := metrics .AddMetricsContext (context .Background ())
172
- namespace , _ := e .namespaceRegistry .GetNamespaceName (namespace .ID (e .GetNamespaceID ()))
175
+ namespaceName , _ := e .namespaceRegistry .GetNamespaceName (namespace .ID (e .GetNamespaceID ()))
176
+ ctx := headers .SetCallerInfo (
177
+ metrics .AddMetricsContext (context .Background ()),
178
+ headers .NewBackgroundCallerInfo (namespaceName .String ()),
179
+ )
180
+
181
+ var panicErr error
182
+ defer func () {
183
+ if panicErr != nil {
184
+ retErr = panicErr
185
+
186
+ // we need to guess the metrics tags here as we don't know which execution logic
187
+ // is actually used which is upto the executor implementation
188
+ e .taggedMetricsHandler = e .metricsHandler .WithTags (e .estimateTaskMetricTag ()... )
189
+ }
190
+ }()
173
191
174
- ctx = headers . SetCallerInfo ( ctx , headers . NewBackgroundCallerInfo ( namespace . String ()) )
192
+ defer log . CapturePanic ( e . logger , & panicErr )
175
193
176
194
startTime := e .timeSource .Now ()
177
195
@@ -296,6 +314,12 @@ func (e *executableImpl) IsRetryableError(err error) bool {
296
314
return false
297
315
}
298
316
317
+ // Internal error is non-retryable and usually means unexpected error has happened,
318
+ // e.g. unknown task, corrupted state, panic etc.
319
+ if common .IsInternalError (err ) {
320
+ return false
321
+ }
322
+
299
323
// ErrTaskRetry means mutable state is not ready for standby task processing
300
324
// there's no point for retrying the task immediately which will hold the worker corouinte
301
325
// TODO: change ErrTaskRetry to a better name
@@ -424,6 +448,10 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
424
448
return false
425
449
}
426
450
451
+ if common .IsInternalError (err ) {
452
+ return false
453
+ }
454
+
427
455
return err != consts .ErrTaskRetry &&
428
456
err != consts .ErrDependencyTaskNotCompleted &&
429
457
err != consts .ErrNamespaceHandover
@@ -433,13 +461,14 @@ func (e *executableImpl) rescheduleTime(
433
461
err error ,
434
462
attempt int ,
435
463
) time.Time {
436
- // elapsedTime ( the first parameter in ComputeNextDelay) is not relevant here
464
+ // elapsedTime, the first parameter in ComputeNextDelay is not relevant here
437
465
// since reschedule policy has no expiration interval.
438
466
439
- if err == consts .ErrTaskRetry || err == consts .ErrNamespaceHandover {
467
+ if err == consts .ErrTaskRetry ||
468
+ err == consts .ErrNamespaceHandover ||
469
+ common .IsInternalError (err ) {
440
470
// using a different reschedule policy to slow down retry
441
- // as the error means mutable state or namespace is not ready to handle the task,
442
- // need to wait for replication.
471
+ // as immediate retry typically won't resolve the issue.
443
472
return e .timeSource .Now ().Add (taskNotReadyReschedulePolicy .ComputeNextDelay (0 , attempt ))
444
473
}
445
474
@@ -469,3 +498,21 @@ func (e *executableImpl) updatePriority() {
469
498
e .lowestPriority = e .priority
470
499
}
471
500
}
501
+
502
+ func (e * executableImpl ) estimateTaskMetricTag () []metrics.Tag {
503
+ namespaceTag := metrics .NamespaceUnknownTag ()
504
+ isActive := true
505
+
506
+ namespace , err := e .namespaceRegistry .GetNamespaceByID (namespace .ID (e .GetNamespaceID ()))
507
+ if err == nil {
508
+ namespaceTag = metrics .NamespaceTag (namespace .Name ().String ())
509
+ isActive = namespace .ActiveInCluster (e .clusterMetadata .GetCurrentClusterName ())
510
+ }
511
+
512
+ taskType := getTaskTypeTagValue (e .Task , isActive )
513
+ return []metrics.Tag {
514
+ namespaceTag ,
515
+ metrics .TaskTypeTag (taskType ),
516
+ metrics .OperationTag (taskType ), // for backward compatibility
517
+ }
518
+ }
0 commit comments