Skip to content

Commit 25015d4

Browse files
authored
Emit lag metrics for all registered history queues (#3851)
1 parent 9d3fede commit 25015d4

File tree

5 files changed

+46
-66
lines changed

5 files changed

+46
-66
lines changed

common/dynamicconfig/constants.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ const (
546546
ShardUpdateMinInterval = "history.shardUpdateMinInterval"
547547
// ShardSyncMinInterval is the minimal time interval which the shard info should be sync to remote
548548
ShardSyncMinInterval = "history.shardSyncMinInterval"
549-
// EmitShardDiffLog whether emit the shard diff log
550-
EmitShardDiffLog = "history.emitShardDiffLog"
549+
// EmitShardLagLog whether emit the shard lag log
550+
EmitShardLagLog = "history.emitShardLagLog"
551551
// DefaultEventEncoding is the encoding type for history events
552552
DefaultEventEncoding = "history.defaultEventEncoding"
553553
// NumArchiveSystemWorkflows is key for number of archive system workflows running in total

common/metrics/metric_defs.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -1410,10 +1410,8 @@ var (
14101410
ShardInfoTransferLagHistogram = NewDimensionlessHistogramDef("shardinfo_transfer_lag")
14111411
ShardInfoTimerLagTimer = NewTimerDef("shardinfo_timer_lag")
14121412
ShardInfoVisibilityLagHistogram = NewDimensionlessHistogramDef("shardinfo_visibility_lag")
1413-
ShardInfoTransferDiffHistogram = NewDimensionlessHistogramDef("shardinfo_transfer_diff")
1414-
ShardInfoTimerDiffTimer = NewTimerDef("shardinfo_timer_diff")
1415-
ShardInfoTransferFailoverInProgressHistogram = NewDimensionlessHistogramDef("shardinfo_transfer_failover_in_progress")
1416-
ShardInfoTimerFailoverInProgressHistogram = NewDimensionlessHistogramDef("shardinfo_timer_failover_in_progress")
1413+
ShardInfoImmediateQueueLagHistogram = NewDimensionlessHistogramDef("shardinfo_immediate_queue_lag")
1414+
ShardInfoScheduledQueueLagTimer = NewTimerDef("shardinfo_scheduled_queue_lag")
14171415
ShardInfoTransferFailoverLatencyTimer = NewTimerDef("shardinfo_transfer_failover_latency")
14181416
ShardInfoTimerFailoverLatencyTimer = NewTimerDef("shardinfo_timer_failover_latency")
14191417
SyncShardFromRemoteCounter = NewCounterDef("syncshard_remote_count")

common/persistence/dataInterfaces.go

+2
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ type (
170170
}
171171

172172
// FailoverLevel contains corresponding start / end level
173+
// TODO: remove FailoverLevel definition, they are only used by
174+
// old queue processing logic
173175
FailoverLevel struct {
174176
StartTime time.Time
175177
MinLevel tasks.Key

service/history/configs/config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type Config struct {
5757
EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter
5858
VisibilityDisableOrderByClause dynamicconfig.BoolPropertyFn
5959

60-
EmitShardDiffLog dynamicconfig.BoolPropertyFn
60+
EmitShardLagLog dynamicconfig.BoolPropertyFn
6161
MaxAutoResetPoints dynamicconfig.IntPropertyFnWithNamespaceFilter
6262
ThrottledLogRPS dynamicconfig.IntPropertyFn
6363
EnableStickyQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter
@@ -320,7 +320,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
320320
EnableReadFromSecondaryAdvancedVisibility: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableReadFromSecondaryAdvancedVisibility, false),
321321
VisibilityDisableOrderByClause: dc.GetBoolProperty(dynamicconfig.VisibilityDisableOrderByClause, false),
322322

323-
EmitShardDiffLog: dc.GetBoolProperty(dynamicconfig.EmitShardDiffLog, false),
323+
EmitShardLagLog: dc.GetBoolProperty(dynamicconfig.EmitShardLagLog, false),
324324
HistoryCacheInitialSize: dc.GetIntProperty(dynamicconfig.HistoryCacheInitialSize, 128),
325325
HistoryCacheMaxSize: dc.GetIntProperty(dynamicconfig.HistoryCacheMaxSize, 512),
326326
HistoryCacheTTL: dc.GetDurationProperty(dynamicconfig.HistoryCacheTTL, time.Hour),

service/history/shard/context_impl.go

+38-58
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,10 @@ var (
172172
)
173173

174174
const (
175-
logWarnImmediateTaskLevelDiff = 3000000 // 3 million
176-
logWarnScheduledTaskLevelDiff = time.Duration(30 * time.Minute)
177-
historySizeLogThreshold = 10 * 1024 * 1024
178-
minContextTimeout = 2 * time.Second
175+
logWarnImmediateTaskLag = 3000000 // 3 million
176+
logWarnScheduledTaskLag = time.Duration(30 * time.Minute)
177+
historySizeLogThreshold = 10 * 1024 * 1024
178+
minContextTimeout = 2 * time.Second
179179
)
180180

181181
func (s *ContextImpl) String() string {
@@ -1276,58 +1276,40 @@ func (s *ContextImpl) updateShardInfoLocked() error {
12761276
return nil
12771277
}
12781278

1279-
// TODO: Instead of having separate metric definition for each task category, we should
1280-
// use one metrics (or two, one for immedidate task, one for scheduled task),
1281-
// and add tags indicating the task category.
12821279
func (s *ContextImpl) emitShardInfoMetricsLogsLocked() {
1283-
currentCluster := s.GetClusterMetadata().GetCurrentClusterName()
1284-
clusterInfo := s.GetClusterMetadata().GetAllClusterInfo()
1285-
1286-
minTransferLevel := s.getQueueClusterAckLevelLocked(tasks.CategoryTransfer, currentCluster) // s.shardInfo.ClusterTransferAckLevel[currentCluster]
1287-
maxTransferLevel := minTransferLevel
1288-
minTimerLevel := s.getQueueClusterAckLevelLocked(tasks.CategoryTimer, currentCluster)
1289-
maxTimerLevel := minTimerLevel
1290-
for clusterName, info := range clusterInfo {
1291-
if !info.Enabled {
1292-
continue
1293-
}
1280+
handler := s.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.ShardInfoScope))
1281+
logWarnLagExceeded := false
12941282

1295-
clusterTransferLevel := s.getQueueClusterAckLevelLocked(tasks.CategoryTransfer, clusterName)
1296-
if clusterTransferLevel.CompareTo(minTransferLevel) < 0 {
1297-
minTransferLevel = clusterTransferLevel
1298-
}
1299-
if clusterTransferLevel.CompareTo(maxTransferLevel) > 0 {
1300-
maxTransferLevel = clusterTransferLevel
1283+
for categoryID := range s.shardInfo.QueueAckLevels {
1284+
category, ok := tasks.GetCategoryByID(categoryID)
1285+
if !ok {
1286+
continue
13011287
}
13021288

1303-
clusterTimerLevel := s.getQueueClusterAckLevelLocked(tasks.CategoryTimer, clusterName)
1304-
if clusterTimerLevel.CompareTo(minTimerLevel) < 0 {
1305-
minTimerLevel = clusterTimerLevel
1306-
}
1307-
if clusterTimerLevel.CompareTo(maxTimerLevel) > 0 {
1308-
maxTimerLevel = clusterTimerLevel
1289+
switch category.Type() {
1290+
case tasks.CategoryTypeImmediate:
1291+
lag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(category).TaskID - 1
1292+
if lag > logWarnImmediateTaskLag {
1293+
logWarnLagExceeded = true
1294+
}
1295+
handler.Histogram(
1296+
metrics.ShardInfoImmediateQueueLagHistogram.GetMetricName(),
1297+
metrics.ShardInfoImmediateQueueLagHistogram.GetMetricUnit(),
1298+
).Record(lag, metrics.TaskCategoryTag(category.Name()))
1299+
case tasks.CategoryTypeScheduled:
1300+
lag := s.scheduledTaskMaxReadLevel.Sub(s.getQueueAckLevelLocked(category).FireTime)
1301+
if lag > logWarnScheduledTaskLag {
1302+
logWarnLagExceeded = true
1303+
}
1304+
handler.Timer(
1305+
metrics.ShardInfoScheduledQueueLagTimer.GetMetricName(),
1306+
).Record(lag, metrics.TaskCategoryTag(category.Name()))
1307+
default:
1308+
s.contextTaggedLogger.Error("Unknown task category type", tag.NewStringTag("task-category", category.Type().String()))
13091309
}
13101310
}
13111311

1312-
diffTransferLevel := maxTransferLevel.TaskID - minTransferLevel.TaskID
1313-
diffTimerLevel := maxTimerLevel.FireTime.Sub(minTimerLevel.FireTime)
1314-
1315-
replicationLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryReplication).TaskID - 1
1316-
transferLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryTransfer).TaskID - 1
1317-
timerLag := s.timeSource.Now().Sub(s.getQueueAckLevelLocked(tasks.CategoryTimer).FireTime)
1318-
visibilityLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryVisibility).TaskID - 1
1319-
1320-
transferFailoverInProgress := len(s.shardInfo.FailoverLevels[tasks.CategoryTransfer])
1321-
timerFailoverInProgress := len(s.shardInfo.FailoverLevels[tasks.CategoryTimer])
1322-
1323-
if s.config.EmitShardDiffLog() &&
1324-
(logWarnImmediateTaskLevelDiff < diffTransferLevel ||
1325-
logWarnScheduledTaskLevelDiff < diffTimerLevel ||
1326-
logWarnImmediateTaskLevelDiff < transferLag ||
1327-
logWarnScheduledTaskLevelDiff < timerLag ||
1328-
logWarnImmediateTaskLevelDiff < visibilityLag ||
1329-
logWarnImmediateTaskLevelDiff < replicationLag) {
1330-
1312+
if logWarnLagExceeded && s.config.EmitShardLagLog() {
13311313
ackLevelTags := make([]tag.Tag, 0, len(s.shardInfo.QueueAckLevels))
13321314
for categoryID, ackLevel := range s.shardInfo.QueueAckLevels {
13331315
category, ok := tasks.GetCategoryByID(categoryID)
@@ -1336,22 +1318,20 @@ func (s *ContextImpl) emitShardInfoMetricsLogsLocked() {
13361318
}
13371319
ackLevelTags = append(ackLevelTags, tag.ShardQueueAcks(category.Name(), ackLevel))
13381320
}
1339-
s.contextTaggedLogger.Warn("Shard ack levels diff exceeds warn threshold.", ackLevelTags...)
1321+
s.contextTaggedLogger.Warn("Shard ack levels lag exceeds warn threshold.", ackLevelTags...)
13401322
}
13411323

1342-
handler := s.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.ShardInfoScope))
1343-
handler.Histogram(metrics.ShardInfoTransferDiffHistogram.GetMetricName(), metrics.ShardInfoTransferDiffHistogram.GetMetricUnit()).Record(diffTransferLevel)
1344-
handler.Timer(metrics.ShardInfoTimerDiffTimer.GetMetricName()).Record(diffTimerLevel)
1324+
// Following metrics are double-emitted for backward compatibility so that old dashboard/alert won't break
1325+
// TODO: remove in 1.21 release
1326+
replicationLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryReplication).TaskID - 1
1327+
transferLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryTransfer).TaskID - 1
1328+
visibilityLag := s.immediateTaskExclusiveMaxReadLevel - s.getQueueAckLevelLocked(tasks.CategoryVisibility).TaskID - 1
1329+
timerLag := s.timeSource.Now().Sub(s.getQueueAckLevelLocked(tasks.CategoryTimer).FireTime)
13451330

13461331
handler.Histogram(metrics.ShardInfoReplicationLagHistogram.GetMetricName(), metrics.ShardInfoReplicationLagHistogram.GetMetricUnit()).Record(replicationLag)
13471332
handler.Histogram(metrics.ShardInfoTransferLagHistogram.GetMetricName(), metrics.ShardInfoTransferLagHistogram.GetMetricUnit()).Record(transferLag)
13481333
handler.Histogram(metrics.ShardInfoVisibilityLagHistogram.GetMetricName(), metrics.ShardInfoVisibilityLagHistogram.GetMetricUnit()).Record(visibilityLag)
13491334
handler.Timer(metrics.ShardInfoTimerLagTimer.GetMetricName()).Record(timerLag)
1350-
1351-
handler.Histogram(metrics.ShardInfoTransferFailoverInProgressHistogram.GetMetricName(), metrics.ShardInfoTransferFailoverInProgressHistogram.GetMetricUnit()).
1352-
Record(int64(transferFailoverInProgress))
1353-
handler.Histogram(metrics.ShardInfoTimerFailoverInProgressHistogram.GetMetricName(), metrics.ShardInfoTimerFailoverInProgressHistogram.GetMetricUnit()).
1354-
Record(int64(timerFailoverInProgress))
13551335
}
13561336

13571337
func (s *ContextImpl) allocateTaskIDAndTimestampLocked(

0 commit comments

Comments
 (0)