Skip to content

Commit d4498d1

Browse files
authored
Expose history size to workflows (#3055)
1 parent 2d17cb8 commit d4498d1

14 files changed

+647
-269
lines changed

api/persistence/v1/executions.pb.go

+314-220
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

+6
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,16 @@ const (
132132
HistorySizeLimitError = "limit.historySize.error"
133133
// HistorySizeLimitWarn is the per workflow execution history size limit for warning
134134
HistorySizeLimitWarn = "limit.historySize.warn"
135+
// HistorySizeSuggestContinueAsNew is the workflow execution history size limit to suggest
136+
// continue-as-new (in workflow task started event)
137+
HistorySizeSuggestContinueAsNew = "limit.historySize.suggestContinueAsNew"
135138
// HistoryCountLimitError is the per workflow execution history event count limit
136139
HistoryCountLimitError = "limit.historyCount.error"
137140
// HistoryCountLimitWarn is the per workflow execution history event count limit for warning
138141
HistoryCountLimitWarn = "limit.historyCount.warn"
142+
// HistoryCountSuggestContinueAsNew is the workflow execution history event count limit to
143+
// suggest continue-as-new (in workflow task started event)
144+
HistoryCountSuggestContinueAsNew = "limit.historyCount.suggestContinueAsNew"
139145
// MaxIDLengthLimit is the length limit for various IDs, including: Namespace, TaskQueue, WorkflowID, ActivityID, TimerID,
140146
// WorkflowType, ActivityType, SignalName, MarkerName, ErrorReason/FailureReason/CancelCause, Identity, RequestID
141147
MaxIDLengthLimit = "limit.maxIDLength"

proto/internal/temporal/server/api/persistence/v1/executions.proto

+2
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ message WorkflowExecutionInfo {
100100
google.protobuf.Timestamp workflow_task_original_scheduled_time = 30 [(gogoproto.stdtime) = true];
101101
string workflow_task_request_id = 31;
102102
temporal.server.api.enums.v1.WorkflowTaskType workflow_task_type = 68;
103+
bool workflow_task_suggest_continue_as_new = 69;
104+
int64 workflow_task_history_size_bytes = 70;
103105

104106
bool cancel_requested = 29;
105107
string cancel_request_id = 32;

service/history/configs/config.go

+28-24
Original file line numberDiff line numberDiff line change
@@ -181,18 +181,20 @@ type Config struct {
181181
DurableArchivalEnabled dynamicconfig.BoolPropertyFn
182182

183183
// Size limit related settings
184-
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
185-
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
186-
MemoSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
187-
MemoSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
188-
HistorySizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
189-
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
190-
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
191-
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
192-
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
193-
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
194-
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
195-
NumPendingCancelsRequestLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
184+
BlobSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
185+
BlobSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
186+
MemoSizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
187+
MemoSizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
188+
HistorySizeLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
189+
HistorySizeLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
190+
HistorySizeSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
191+
HistoryCountLimitError dynamicconfig.IntPropertyFnWithNamespaceFilter
192+
HistoryCountLimitWarn dynamicconfig.IntPropertyFnWithNamespaceFilter
193+
HistoryCountSuggestContinueAsNew dynamicconfig.IntPropertyFnWithNamespaceFilter
194+
NumPendingChildExecutionsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
195+
NumPendingActivitiesLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
196+
NumPendingSignalsLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
197+
NumPendingCancelsRequestLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
196198

197199
// DefaultActivityRetryOptions specifies the out-of-box retry policy if
198200
// none is configured on the Activity by the user.
@@ -417,18 +419,20 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
417419
ArchiveSignalTimeout: dc.GetDurationProperty(dynamicconfig.ArchiveSignalTimeout, 300*time.Millisecond),
418420
DurableArchivalEnabled: dc.GetBoolProperty(dynamicconfig.DurableArchivalEnabled, true),
419421

420-
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
421-
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
422-
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
423-
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
424-
NumPendingChildExecutionsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionsLimitError, 50000),
425-
NumPendingActivitiesLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingActivitiesLimitError, 50000),
426-
NumPendingSignalsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingSignalsLimitError, 50000),
427-
NumPendingCancelsRequestLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingCancelRequestsLimitError, 50000),
428-
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
429-
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
430-
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
431-
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitWarn, 10*1024),
422+
BlobSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitError, 2*1024*1024),
423+
BlobSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.BlobSizeLimitWarn, 512*1024),
424+
MemoSizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitError, 2*1024*1024),
425+
MemoSizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.MemoSizeLimitWarn, 2*1024),
426+
NumPendingChildExecutionsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingChildExecutionsLimitError, 50000),
427+
NumPendingActivitiesLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingActivitiesLimitError, 50000),
428+
NumPendingSignalsLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingSignalsLimitError, 50000),
429+
NumPendingCancelsRequestLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.NumPendingCancelRequestsLimitError, 50000),
430+
HistorySizeLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitError, 50*1024*1024),
431+
HistorySizeLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeLimitWarn, 10*1024*1024),
432+
HistorySizeSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistorySizeSuggestContinueAsNew, 4*1024*1024),
433+
HistoryCountLimitError: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitError, 50*1024),
434+
HistoryCountLimitWarn: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountLimitWarn, 10*1024),
435+
HistoryCountSuggestContinueAsNew: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryCountSuggestContinueAsNew, 4*1024),
432436

433437
ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.HistoryThrottledLogRPS, 4),
434438
EnableStickyQuery: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableStickyQuery, true),

service/history/workflow/history_builder.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -218,13 +218,17 @@ func (b *HistoryBuilder) AddWorkflowTaskStartedEvent(
218218
requestID string,
219219
identity string,
220220
startTime time.Time,
221+
suggestContinueAsNew bool,
222+
historySizeBytes int64,
221223
) *historypb.HistoryEvent {
222224
event := b.createNewHistoryEvent(enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, startTime)
223225
event.Attributes = &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
224226
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
225-
ScheduledEventId: scheduledEventID,
226-
Identity: identity,
227-
RequestId: requestID,
227+
ScheduledEventId: scheduledEventID,
228+
Identity: identity,
229+
RequestId: requestID,
230+
SuggestContinueAsNew: suggestContinueAsNew,
231+
HistorySizeBytes: historySizeBytes,
228232
},
229233
}
230234

service/history/workflow/history_builder_test.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,8 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() {
639639
testRequestID,
640640
testIdentity,
641641
s.now,
642+
false,
643+
123678,
642644
)
643645
s.Equal(event, s.flush())
644646
s.Equal(&historypb.HistoryEvent{
@@ -649,9 +651,11 @@ func (s *historyBuilderSuite) TestWorkflowTaskStarted() {
649651
Version: s.version,
650652
Attributes: &historypb.HistoryEvent_WorkflowTaskStartedEventAttributes{
651653
WorkflowTaskStartedEventAttributes: &historypb.WorkflowTaskStartedEventAttributes{
652-
ScheduledEventId: scheduledEventID,
653-
Identity: testIdentity,
654-
RequestId: testRequestID,
654+
ScheduledEventId: scheduledEventID,
655+
Identity: testIdentity,
656+
RequestId: testRequestID,
657+
SuggestContinueAsNew: false,
658+
HistorySizeBytes: 123678,
655659
},
656660
},
657661
}, event)

service/history/workflow/mutable_state.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ type (
8989

9090
// Indicate type of the current workflow task (normal, transient, or speculative).
9191
Type enumsspb.WorkflowTaskType
92+
93+
// These two fields are sent to workers in the WorkflowTaskStarted event. We need to save a
94+
// copy in mutable state to know the last values we sent (which might have been in a
95+
// transient event), otherwise a dynamic config change of the suggestion threshold could
96+
// cause the WorkflowTaskStarted event that the worker used to not match the event we saved
97+
// in history.
98+
SuggestContinueAsNew bool
99+
HistorySizeBytes int64
92100
}
93101

94102
MutableState interface {
@@ -224,7 +232,7 @@ type (
224232
ReplicateWorkflowTaskCompletedEvent(*historypb.HistoryEvent) error
225233
ReplicateWorkflowTaskFailedEvent() error
226234
ReplicateWorkflowTaskScheduledEvent(int64, int64, *taskqueuepb.TaskQueue, *time.Duration, int32, *time.Time, *time.Time, enumsspb.WorkflowTaskType) (*WorkflowTaskInfo, error)
227-
ReplicateWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time) (*WorkflowTaskInfo, error)
235+
ReplicateWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time, bool, int64) (*WorkflowTaskInfo, error)
228236
ReplicateWorkflowTaskTimedOutEvent(enumspb.TimeoutType) error
229237
ReplicateExternalWorkflowExecutionCancelRequested(*historypb.HistoryEvent) error
230238
ReplicateExternalWorkflowExecutionSignaled(*historypb.HistoryEvent) error

service/history/workflow/mutable_state_impl.go

+7-2
Original file line numberDiff line numberDiff line change
@@ -1330,6 +1330,9 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {
13301330
TaskQueue: nil,
13311331
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
13321332
Type: enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED,
1333+
1334+
SuggestContinueAsNew: false,
1335+
HistorySizeBytes: 0,
13331336
}
13341337
ms.workflowTaskManager.UpdateWorkflowTask(emptyWorkflowTaskInfo)
13351338
return nil
@@ -1792,9 +1795,11 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskStartedEvent(
17921795
startedEventID int64,
17931796
requestID string,
17941797
timestamp time.Time,
1798+
suggestContinueAsNew bool,
1799+
historySizeBytes int64,
17951800
) (*WorkflowTaskInfo, error) {
1796-
1797-
return ms.workflowTaskManager.ReplicateWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID, startedEventID, requestID, timestamp)
1801+
return ms.workflowTaskManager.ReplicateWorkflowTaskStartedEvent(workflowTask, version, scheduledEventID,
1802+
startedEventID, requestID, timestamp, suggestContinueAsNew, historySizeBytes)
17981803
}
17991804

18001805
// TODO (alex-update): Transient needs to be renamed to "TransientOrSpeculative"

service/history/workflow/mutable_state_impl_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,8 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchRepl
597597
workflowTaskStartedEvent.GetEventId(),
598598
workflowTaskStartedEvent.GetWorkflowTaskStartedEventAttributes().GetRequestId(),
599599
timestamp.TimeValue(workflowTaskStartedEvent.GetEventTime()),
600+
false,
601+
123678,
600602
)
601603
s.Nil(err)
602604
s.NotNil(wt)
@@ -649,6 +651,8 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchRepl
649651
newWorkflowTaskStartedEvent.GetEventId(),
650652
newWorkflowTaskStartedEvent.GetWorkflowTaskStartedEventAttributes().GetRequestId(),
651653
timestamp.TimeValue(newWorkflowTaskStartedEvent.GetEventTime()),
654+
false,
655+
123678,
652656
)
653657
s.Nil(err)
654658
s.NotNil(wt)

service/history/workflow/mutable_state_mock.go

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/workflow/mutable_state_rebuilder.go

+2
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ func (b *MutableStateRebuilderImpl) ApplyEvents(
216216
event.GetEventId(),
217217
attributes.GetRequestId(),
218218
timestamp.TimeValue(event.GetEventTime()),
219+
attributes.GetSuggestContinueAsNew(),
220+
attributes.GetHistorySizeBytes(),
219221
)
220222
if err != nil {
221223
return nil, err

service/history/workflow/mutable_state_rebuilder_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskStarted() {
808808
}
809809
s.mockMutableState.EXPECT().ReplicateWorkflowTaskStartedEvent(
810810
(*WorkflowTaskInfo)(nil), event.GetVersion(), scheduledEventID, event.GetEventId(), workflowTaskRequestID, timestamp.TimeValue(event.GetEventTime()),
811+
false, gomock.Any(),
811812
).Return(wt, nil)
812813
s.mockUpdateVersion(event)
813814
s.mockTaskGenerator.EXPECT().GenerateStartWorkflowTaskTasks(

0 commit comments

Comments
 (0)