Skip to content

Commit c94d2bf

Browse files
Refactor history cache to its own package (#3601)
* Refactor history cache * Refactor DeleteManager
1 parent 4907ff9 commit c94d2bf

File tree

70 files changed

+464
-374
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+464
-374
lines changed

service/history/api/consistency_checker.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ import (
4141
"go.temporal.io/server/service/history/shard"
4242
"go.temporal.io/server/service/history/vclock"
4343
"go.temporal.io/server/service/history/workflow"
44+
wcache "go.temporal.io/server/service/history/workflow/cache"
4445
)
4546

4647
type (
4748
MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool
4849

4950
WorkflowConsistencyChecker interface {
50-
GetWorkflowCache() workflow.Cache
51+
GetWorkflowCache() wcache.Cache
5152
GetCurrentRunID(
5253
ctx context.Context,
5354
namespaceID string,
@@ -63,21 +64,21 @@ type (
6364

6465
WorkflowConsistencyCheckerImpl struct {
6566
shardContext shard.Context
66-
workflowCache workflow.Cache
67+
workflowCache wcache.Cache
6768
}
6869
)
6970

7071
func NewWorkflowConsistencyChecker(
7172
shardContext shard.Context,
72-
workflowCache workflow.Cache,
73+
workflowCache wcache.Cache,
7374
) *WorkflowConsistencyCheckerImpl {
7475
return &WorkflowConsistencyCheckerImpl{
7576
shardContext: shardContext,
7677
workflowCache: workflowCache,
7778
}
7879
}
7980

80-
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() workflow.Cache {
81+
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() wcache.Cache {
8182
return c.workflowCache
8283
}
8384

service/history/api/consistency_checker_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"go.temporal.io/server/common/persistence/versionhistory"
4646
"go.temporal.io/server/service/history/shard"
4747
"go.temporal.io/server/service/history/workflow"
48+
wcache "go.temporal.io/server/service/history/workflow/cache"
4849
)
4950

5051
type (
@@ -54,7 +55,7 @@ type (
5455

5556
controller *gomock.Controller
5657
shardContext *shard.MockContext
57-
workflowCache *workflow.MockCache
58+
workflowCache *wcache.MockCache
5859

5960
shardID int32
6061
namespaceID string
@@ -82,7 +83,7 @@ func (s *workflowConsistencyCheckerSuite) SetupTest() {
8283

8384
s.controller = gomock.NewController(s.T())
8485
s.shardContext = shard.NewMockContext(s.controller)
85-
s.workflowCache = workflow.NewMockCache(s.controller)
86+
s.workflowCache = wcache.NewMockCache(s.controller)
8687

8788
s.shardID = rand.Int31()
8889
s.namespaceID = uuid.New().String()

service/history/api/create_workflow_util.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"go.temporal.io/server/common/rpc/interceptor"
4545
"go.temporal.io/server/service/history/shard"
4646
"go.temporal.io/server/service/history/workflow"
47+
wcache "go.temporal.io/server/service/history/workflow/cache"
4748
)
4849

4950
type (
@@ -117,7 +118,7 @@ func NewWorkflowWithSignal(
117118
),
118119
shard.GetLogger(),
119120
)
120-
return NewWorkflowContext(newWorkflowContext, workflow.NoopReleaseFn, newMutableState), nil
121+
return NewWorkflowContext(newWorkflowContext, wcache.NoopReleaseFn, newMutableState), nil
121122
}
122123

123124
func CreateMutableState(

service/history/api/deleteworkflow/api.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"go.temporal.io/server/common/namespace"
3535
"go.temporal.io/server/service/history/api"
3636
"go.temporal.io/server/service/history/consts"
37+
"go.temporal.io/server/service/history/deletemanager"
3738
"go.temporal.io/server/service/history/shard"
3839
"go.temporal.io/server/service/history/workflow"
3940
)
@@ -43,7 +44,7 @@ func Invoke(
4344
request *historyservice.DeleteWorkflowExecutionRequest,
4445
shard shard.Context,
4546
workflowConsistencyChecker api.WorkflowConsistencyChecker,
46-
workflowDeleteManager workflow.DeleteManager,
47+
workflowDeleteManager deletemanager.DeleteManager,
4748
) (_ *historyservice.DeleteWorkflowExecutionResponse, retError error) {
4849
weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
4950
ctx,

service/history/api/reapplyevents/api.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
"go.temporal.io/server/service/history/api"
4242
"go.temporal.io/server/service/history/ndc"
4343
"go.temporal.io/server/service/history/shard"
44-
"go.temporal.io/server/service/history/workflow"
44+
wcache "go.temporal.io/server/service/history/workflow/cache"
4545
)
4646

4747
func Invoke(
@@ -156,7 +156,7 @@ func Invoke(
156156
shard.GetClusterMetadata(),
157157
context,
158158
mutableState,
159-
workflow.NoopReleaseFn,
159+
wcache.NoopReleaseFn,
160160
),
161161
ndc.EventsReapplicationResetWorkflowReason,
162162
toReapplyEvents,

service/history/api/signalwithstartworkflow/signal_with_start_workflow_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"go.temporal.io/server/service/history/shard"
4646
"go.temporal.io/server/service/history/tests"
4747
"go.temporal.io/server/service/history/workflow"
48+
wcache "go.temporal.io/server/service/history/workflow/cache"
4849
)
4950

5051
type (
@@ -111,7 +112,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() {
111112
ctx := context.Background()
112113
currentWorkflowContext := api.NewWorkflowContext(
113114
s.currentContext,
114-
workflow.NoopReleaseFn,
115+
wcache.NoopReleaseFn,
115116
s.currentMutableState,
116117
)
117118
request := s.randomRequest()
@@ -131,7 +132,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
131132
ctx := context.Background()
132133
currentWorkflowContext := api.NewWorkflowContext(
133134
s.currentContext,
134-
workflow.NoopReleaseFn,
135+
wcache.NoopReleaseFn,
135136
s.currentMutableState,
136137
)
137138
request := s.randomRequest()
@@ -161,7 +162,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
161162
ctx := context.Background()
162163
currentWorkflowContext := api.NewWorkflowContext(
163164
s.currentContext,
164-
workflow.NoopReleaseFn,
165+
wcache.NoopReleaseFn,
165166
s.currentMutableState,
166167
)
167168
request := s.randomRequest()

service/history/api/workflow_context.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828
"go.temporal.io/server/common/definition"
2929
"go.temporal.io/server/common/namespace"
3030
"go.temporal.io/server/service/history/workflow"
31+
wcache "go.temporal.io/server/service/history/workflow/cache"
3132
)
3233

3334
type WorkflowContext interface {
3435
GetContext() workflow.Context
3536
GetMutableState() workflow.MutableState
36-
GetReleaseFn() workflow.ReleaseCacheFunc
37+
GetReleaseFn() wcache.ReleaseCacheFunc
3738

3839
GetNamespaceEntry() *namespace.Namespace
3940
GetWorkflowKey() definition.WorkflowKey
@@ -42,7 +43,7 @@ type WorkflowContext interface {
4243
type WorkflowContextImpl struct {
4344
context workflow.Context
4445
mutableState workflow.MutableState
45-
releaseFn workflow.ReleaseCacheFunc
46+
releaseFn wcache.ReleaseCacheFunc
4647
}
4748

4849
type UpdateWorkflowAction struct {
@@ -65,7 +66,7 @@ var _ WorkflowContext = (*WorkflowContextImpl)(nil)
6566

6667
func NewWorkflowContext(
6768
context workflow.Context,
68-
releaseFn workflow.ReleaseCacheFunc,
69+
releaseFn wcache.ReleaseCacheFunc,
6970
mutableState workflow.MutableState,
7071
) *WorkflowContextImpl {
7172

@@ -84,7 +85,7 @@ func (w *WorkflowContextImpl) GetMutableState() workflow.MutableState {
8485
return w.mutableState
8586
}
8687

87-
func (w *WorkflowContextImpl) GetReleaseFn() workflow.ReleaseCacheFunc {
88+
func (w *WorkflowContextImpl) GetReleaseFn() wcache.ReleaseCacheFunc {
8889
return w.releaseFn
8990
}
9091

service/history/workflow/delete_manager.go service/history/deletemanager/delete_manager.go

+21-19
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination delete_manager_mock.go
2626

27-
package workflow
27+
package deletemanager
2828

2929
import (
3030
"context"
@@ -44,6 +44,8 @@ import (
4444
"go.temporal.io/server/service/history/configs"
4545
"go.temporal.io/server/service/history/shard"
4646
"go.temporal.io/server/service/history/tasks"
47+
"go.temporal.io/server/service/history/workflow"
48+
wcache "go.temporal.io/server/service/history/workflow/cache"
4749
"go.temporal.io/server/service/worker/archiver"
4850
)
4951

@@ -53,32 +55,32 @@ type (
5355
ctx context.Context,
5456
nsID namespace.ID,
5557
we commonpb.WorkflowExecution,
56-
ms MutableState,
58+
ms workflow.MutableState,
5759
workflowClosedVersion int64,
5860
) error
5961
DeleteWorkflowExecution(
6062
ctx context.Context,
6163
nsID namespace.ID,
6264
we commonpb.WorkflowExecution,
63-
weCtx Context,
64-
ms MutableState,
65+
weCtx workflow.Context,
66+
ms workflow.MutableState,
6567
forceDeleteFromOpenVisibility bool,
6668
stage *tasks.DeleteWorkflowExecutionStage,
6769
) error
6870
DeleteWorkflowExecutionByRetention(
6971
ctx context.Context,
7072
nsID namespace.ID,
7173
we commonpb.WorkflowExecution,
72-
weCtx Context,
73-
ms MutableState,
74+
weCtx workflow.Context,
75+
ms workflow.MutableState,
7476
archiveIfEnabled bool,
7577
stage *tasks.DeleteWorkflowExecutionStage,
7678
) error
7779
}
7880

7981
DeleteManagerImpl struct {
8082
shard shard.Context
81-
historyCache Cache
83+
workflowCache wcache.Cache
8284
config *configs.Config
8385
metricsHandler metrics.MetricsHandler
8486
archivalClient archiver.Client
@@ -90,14 +92,14 @@ var _ DeleteManager = (*DeleteManagerImpl)(nil)
9092

9193
func NewDeleteManager(
9294
shard shard.Context,
93-
cache Cache,
95+
cache wcache.Cache,
9496
config *configs.Config,
9597
archiverClient archiver.Client,
9698
timeSource clock.TimeSource,
9799
) *DeleteManagerImpl {
98100
deleteManager := &DeleteManagerImpl{
99101
shard: shard,
100-
historyCache: cache,
102+
workflowCache: cache,
101103
metricsHandler: shard.GetMetricsHandler(),
102104
config: config,
103105
archivalClient: archiverClient,
@@ -111,11 +113,11 @@ func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
111113
ctx context.Context,
112114
nsID namespace.ID,
113115
we commonpb.WorkflowExecution,
114-
ms MutableState,
116+
ms workflow.MutableState,
115117
workflowClosedVersion int64,
116118
) error {
117119

118-
taskGenerator := taskGeneratorProvider.NewTaskGenerator(m.shard, ms)
120+
taskGenerator := workflow.NewTaskGeneratorProvider().NewTaskGenerator(m.shard, ms)
119121

120122
// We can make this task immediately because the task itself will keep rescheduling itself until the workflow is
121123
// closed before actually deleting the workflow.
@@ -141,8 +143,8 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
141143
ctx context.Context,
142144
nsID namespace.ID,
143145
we commonpb.WorkflowExecution,
144-
weCtx Context,
145-
ms MutableState,
146+
weCtx workflow.Context,
147+
ms workflow.MutableState,
146148
forceDeleteFromOpenVisibility bool,
147149
stage *tasks.DeleteWorkflowExecutionStage,
148150
) error {
@@ -164,8 +166,8 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
164166
ctx context.Context,
165167
nsID namespace.ID,
166168
we commonpb.WorkflowExecution,
167-
weCtx Context,
168-
ms MutableState,
169+
weCtx workflow.Context,
170+
ms workflow.MutableState,
169171
archiveIfEnabled bool,
170172
stage *tasks.DeleteWorkflowExecutionStage,
171173
) error {
@@ -187,8 +189,8 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
187189
ctx context.Context,
188190
namespaceID namespace.ID,
189191
we commonpb.WorkflowExecution,
190-
weCtx Context,
191-
ms MutableState,
192+
weCtx workflow.Context,
193+
ms workflow.MutableState,
192194
archiveIfEnabled bool,
193195
forceDeleteFromOpenVisibility bool,
194196
stage *tasks.DeleteWorkflowExecutionStage,
@@ -264,8 +266,8 @@ func (m *DeleteManagerImpl) archiveWorkflowIfEnabled(
264266
namespaceID namespace.ID,
265267
workflowExecution commonpb.WorkflowExecution,
266268
currentBranchToken []byte,
267-
weCtx Context,
268-
ms MutableState,
269+
weCtx workflow.Context,
270+
ms workflow.MutableState,
269271
metricsHandler metrics.MetricsHandler,
270272
) (deletionPromised bool, err error) {
271273

service/history/workflow/delete_manager_mock.go service/history/deletemanager/delete_manager_mock.go

+7-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)