Skip to content

Commit d353dcc

Browse files
authored
Emit inordered buffered events metric (#3949)
1 parent 0a22e1c commit d353dcc

File tree

4 files changed

+59
-0
lines changed

4 files changed

+59
-0
lines changed

common/metrics/metric_defs.go

+1
Original file line numberDiff line numberDiff line change
@@ -1474,6 +1474,7 @@ var (
14741474
ShardLockLatency = NewTimerDef("shard_lock_latency")
14751475
NamespaceRegistryLockLatency = NewTimerDef("namespace_registry_lock_latency")
14761476
ClosedWorkflowBufferEventCount = NewCounterDef("closed_workflow_buffer_event_counter")
1477+
InorderBufferedEventsCounter = NewCounterDef("inordered_buffered_events")
14771478

14781479
// Matching
14791480
MatchingClientForwardedCounter = NewCounterDef("forwarded")

service/history/workflow/history_builder.go

+50
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"go.temporal.io/server/api/historyservice/v1"
4040
"go.temporal.io/server/common"
4141
"go.temporal.io/server/common/clock"
42+
"go.temporal.io/server/common/metrics"
4243
"go.temporal.io/server/common/namespace"
4344
"go.temporal.io/server/common/primitives/timestamp"
4445
)
@@ -90,6 +91,8 @@ type (
9091

9192
// scheduled to started event ID mapping
9293
scheduledIDToStartedID map[int64]int64
94+
95+
metricsHandler metrics.Handler
9396
}
9497
)
9598

@@ -99,6 +102,7 @@ func NewMutableHistoryBuilder(
99102
version int64,
100103
nextEventID int64,
101104
dbBufferBatch []*historypb.HistoryEvent,
105+
metricsHandler metrics.Handler,
102106
) *HistoryBuilder {
103107
return &HistoryBuilder{
104108
state: HistoryBuilderStateMutable,
@@ -116,6 +120,8 @@ func NewMutableHistoryBuilder(
116120
memLatestBatch: nil,
117121
memBufferBatch: nil,
118122
scheduledIDToStartedID: make(map[int64]int64),
123+
124+
metricsHandler: metricsHandler,
119125
}
120126
}
121127

@@ -139,6 +145,8 @@ func NewImmutableHistoryBuilder(
139145
memLatestBatch: history,
140146
memBufferBatch: nil,
141147
scheduledIDToStartedID: nil,
148+
149+
metricsHandler: nil,
142150
}
143151
}
144152

@@ -1401,6 +1409,7 @@ func (b *HistoryBuilder) wireEventIDs(
14011409
func (b *HistoryBuilder) reorderBuffer(
14021410
bufferEvents []*historypb.HistoryEvent,
14031411
) []*historypb.HistoryEvent {
1412+
b.emitInorderedBufferedEvents(bufferEvents)
14041413
reorderBuffer := make([]*historypb.HistoryEvent, 0, len(bufferEvents))
14051414
reorderEvents := make([]*historypb.HistoryEvent, 0, len(bufferEvents))
14061415
for _, event := range bufferEvents {
@@ -1423,6 +1432,47 @@ func (b *HistoryBuilder) reorderBuffer(
14231432
return append(reorderEvents, reorderBuffer...)
14241433
}
14251434

1435+
func (b *HistoryBuilder) emitInorderedBufferedEvents(bufferedEvents []*historypb.HistoryEvent) {
1436+
completedActivities := make(map[int64]struct{})
1437+
completedChildWorkflows := make(map[int64]struct{})
1438+
var inorderedEventsCount int64
1439+
for _, event := range bufferedEvents {
1440+
switch event.GetEventType() {
1441+
case enumspb.EVENT_TYPE_ACTIVITY_TASK_STARTED:
1442+
if _, seenCompleted := completedActivities[event.GetEventId()]; seenCompleted {
1443+
inorderedEventsCount++
1444+
}
1445+
case enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
1446+
completedActivities[event.GetActivityTaskCompletedEventAttributes().GetStartedEventId()] = struct{}{}
1447+
case enumspb.EVENT_TYPE_ACTIVITY_TASK_FAILED:
1448+
completedActivities[event.GetActivityTaskFailedEventAttributes().GetStartedEventId()] = struct{}{}
1449+
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
1450+
completedActivities[event.GetActivityTaskTimedOutEventAttributes().GetStartedEventId()] = struct{}{}
1451+
case enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCELED:
1452+
completedActivities[event.GetActivityTaskCanceledEventAttributes().GetStartedEventId()] = struct{}{}
1453+
1454+
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED:
1455+
if _, seenCompleted := completedChildWorkflows[event.GetEventId()]; seenCompleted {
1456+
inorderedEventsCount++
1457+
}
1458+
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED:
1459+
completedChildWorkflows[event.GetChildWorkflowExecutionCompletedEventAttributes().GetStartedEventId()] = struct{}{}
1460+
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED:
1461+
completedChildWorkflows[event.GetChildWorkflowExecutionFailedEventAttributes().GetStartedEventId()] = struct{}{}
1462+
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT:
1463+
completedChildWorkflows[event.GetChildWorkflowExecutionTimedOutEventAttributes().GetStartedEventId()] = struct{}{}
1464+
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED:
1465+
completedChildWorkflows[event.GetChildWorkflowExecutionCanceledEventAttributes().GetStartedEventId()] = struct{}{}
1466+
case enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED:
1467+
completedChildWorkflows[event.GetChildWorkflowExecutionTerminatedEventAttributes().GetStartedEventId()] = struct{}{}
1468+
}
1469+
}
1470+
1471+
if inorderedEventsCount > 0 && b.metricsHandler != nil {
1472+
b.metricsHandler.Counter(metrics.InorderBufferedEventsCounter.GetMetricName()).Record(inorderedEventsCount)
1473+
}
1474+
}
1475+
14261476
func (b *HistoryBuilder) HasActivityFinishEvent(
14271477
scheduledEventID int64,
14281478
) bool {

service/history/workflow/history_builder_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
workflowspb "go.temporal.io/server/api/workflow/v1"
4646
"go.temporal.io/server/common"
4747
"go.temporal.io/server/common/clock"
48+
"go.temporal.io/server/common/metrics"
4849
"go.temporal.io/server/common/namespace"
4950
"go.temporal.io/server/common/primitives/timestamp"
5051
"go.temporal.io/server/service/history/tests"
@@ -154,6 +155,7 @@ func (s *historyBuilderSuite) SetupTest() {
154155
s.version,
155156
s.nextEventID,
156157
nil,
158+
metrics.NoopMetricsHandler,
157159
)
158160
}
159161

@@ -2088,6 +2090,7 @@ func (s *historyBuilderSuite) testWireEventIDs(
20882090
s.version,
20892091
s.nextEventID,
20902092
nil,
2093+
metrics.NoopMetricsHandler,
20912094
)
20922095
s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{startEvent}
20932096
s.historyBuilder.memEventsBatches = nil
@@ -2134,6 +2137,7 @@ func (s *historyBuilderSuite) TestHasBufferEvent() {
21342137
s.version,
21352138
s.nextEventID,
21362139
nil,
2140+
metrics.NoopMetricsHandler,
21372141
)
21382142
historyBuilder.dbBufferBatch = nil
21392143
historyBuilder.memEventsBatches = nil

service/history/workflow/mutable_state_impl.go

+4
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ func NewMutableState(
265265
s.currentVersion,
266266
common.FirstEventID,
267267
s.bufferEventsInDB,
268+
s.metricsHandler,
268269
)
269270
s.taskGenerator = taskGeneratorProvider.NewTaskGenerator(shard, s)
270271
s.workflowTaskManager = newWorkflowTaskStateMachine(s)
@@ -326,6 +327,7 @@ func newMutableStateFromDB(
326327
common.EmptyVersion,
327328
dbRecord.NextEventId,
328329
dbRecord.BufferedEvents,
330+
mutableState.metricsHandler,
329331
)
330332

331333
mutableState.currentVersion = common.EmptyVersion
@@ -527,6 +529,7 @@ func (ms *MutableStateImpl) UpdateCurrentVersion(
527529
ms.currentVersion,
528530
ms.nextEventIDInDB,
529531
ms.bufferEventsInDB,
532+
ms.metricsHandler,
530533
)
531534

532535
return nil
@@ -4140,6 +4143,7 @@ func (ms *MutableStateImpl) cleanupTransaction(
41404143
ms.GetCurrentVersion(),
41414144
ms.nextEventIDInDB,
41424145
ms.bufferEventsInDB,
4146+
ms.metricsHandler,
41434147
)
41444148

41454149
ms.InsertTasks = make(map[tasks.Category][]tasks.Task)

0 commit comments

Comments
 (0)