Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor history cache to its own package #3601

Merged
merged 7 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ import (
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/vclock"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type (
MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool

WorkflowConsistencyChecker interface {
GetWorkflowCache() workflow.Cache
GetWorkflowCache() wcache.Cache
GetCurrentRunID(
ctx context.Context,
namespaceID string,
Expand All @@ -63,21 +64,21 @@ type (

WorkflowConsistencyCheckerImpl struct {
shardContext shard.Context
workflowCache workflow.Cache
workflowCache wcache.Cache
}
)

func NewWorkflowConsistencyChecker(
shardContext shard.Context,
workflowCache workflow.Cache,
workflowCache wcache.Cache,
) *WorkflowConsistencyCheckerImpl {
return &WorkflowConsistencyCheckerImpl{
shardContext: shardContext,
workflowCache: workflowCache,
}
}

func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() workflow.Cache {
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() wcache.Cache {
return c.workflowCache
}

Expand Down
5 changes: 3 additions & 2 deletions service/history/api/consistency_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type (
Expand All @@ -54,7 +55,7 @@ type (

controller *gomock.Controller
shardContext *shard.MockContext
workflowCache *workflow.MockCache
workflowCache *wcache.MockCache

shardID int32
namespaceID string
Expand Down Expand Up @@ -82,7 +83,7 @@ func (s *workflowConsistencyCheckerSuite) SetupTest() {

s.controller = gomock.NewController(s.T())
s.shardContext = shard.NewMockContext(s.controller)
s.workflowCache = workflow.NewMockCache(s.controller)
s.workflowCache = wcache.NewMockCache(s.controller)

s.shardID = rand.Int31()
s.namespaceID = uuid.New().String()
Expand Down
3 changes: 2 additions & 1 deletion service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type (
Expand Down Expand Up @@ -117,7 +118,7 @@ func NewWorkflowWithSignal(
),
shard.GetLogger(),
)
return NewWorkflowContext(newWorkflowContext, workflow.NoopReleaseFn, newMutableState), nil
return NewWorkflowContext(newWorkflowContext, wcache.NoopReleaseFn, newMutableState), nil
}

func CreateMutableState(
Expand Down
3 changes: 2 additions & 1 deletion service/history/api/deleteworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand All @@ -43,7 +44,7 @@ func Invoke(
request *historyservice.DeleteWorkflowExecutionRequest,
shard shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
workflowDeleteManager workflow.DeleteManager,
workflowDeleteManager deletemanager.DeleteManager,
) (_ *historyservice.DeleteWorkflowExecutionResponse, retError error) {
weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
ctx,
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/reapplyevents/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/ndc"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

func Invoke(
Expand Down Expand Up @@ -156,7 +156,7 @@ func Invoke(
shard.GetClusterMetadata(),
context,
mutableState,
workflow.NoopReleaseFn,
wcache.NoopReleaseFn,
),
ndc.EventsReapplicationResetWorkflowReason,
toReapplyEvents,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type (
Expand Down Expand Up @@ -111,7 +112,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
workflow.NoopReleaseFn,
wcache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()
Expand All @@ -131,7 +132,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
workflow.NoopReleaseFn,
wcache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()
Expand Down Expand Up @@ -161,7 +162,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
workflow.NoopReleaseFn,
wcache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()
Expand Down
9 changes: 5 additions & 4 deletions service/history/api/workflow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
)

type WorkflowContext interface {
GetContext() workflow.Context
GetMutableState() workflow.MutableState
GetReleaseFn() workflow.ReleaseCacheFunc
GetReleaseFn() wcache.ReleaseCacheFunc

GetNamespaceEntry() *namespace.Namespace
GetWorkflowKey() definition.WorkflowKey
Expand All @@ -42,7 +43,7 @@ type WorkflowContext interface {
type WorkflowContextImpl struct {
context workflow.Context
mutableState workflow.MutableState
releaseFn workflow.ReleaseCacheFunc
releaseFn wcache.ReleaseCacheFunc
}

type UpdateWorkflowAction struct {
Expand All @@ -65,7 +66,7 @@ var _ WorkflowContext = (*WorkflowContextImpl)(nil)

func NewWorkflowContext(
context workflow.Context,
releaseFn workflow.ReleaseCacheFunc,
releaseFn wcache.ReleaseCacheFunc,
mutableState workflow.MutableState,
) *WorkflowContextImpl {

Expand All @@ -84,7 +85,7 @@ func (w *WorkflowContextImpl) GetMutableState() workflow.MutableState {
return w.mutableState
}

func (w *WorkflowContextImpl) GetReleaseFn() workflow.ReleaseCacheFunc {
func (w *WorkflowContextImpl) GetReleaseFn() wcache.ReleaseCacheFunc {
return w.releaseFn
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination delete_manager_mock.go

package workflow
package deletemanager

import (
"context"
Expand All @@ -44,6 +44,8 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.temporal.io/server/service/worker/archiver"
)

Expand All @@ -53,32 +55,32 @@ type (
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
ms MutableState,
ms workflow.MutableState,
workflowClosedVersion int64,
) error
DeleteWorkflowExecution(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
weCtx workflow.Context,
ms workflow.MutableState,
forceDeleteFromOpenVisibility bool,
stage *tasks.DeleteWorkflowExecutionStage,
) error
DeleteWorkflowExecutionByRetention(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
weCtx workflow.Context,
ms workflow.MutableState,
archiveIfEnabled bool,
stage *tasks.DeleteWorkflowExecutionStage,
) error
}

DeleteManagerImpl struct {
shard shard.Context
historyCache Cache
workflowCache wcache.Cache
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi

config *configs.Config
metricsHandler metrics.MetricsHandler
archivalClient archiver.Client
Expand All @@ -90,14 +92,14 @@ var _ DeleteManager = (*DeleteManagerImpl)(nil)

func NewDeleteManager(
shard shard.Context,
cache Cache,
cache wcache.Cache,
config *configs.Config,
archiverClient archiver.Client,
timeSource clock.TimeSource,
) *DeleteManagerImpl {
deleteManager := &DeleteManagerImpl{
shard: shard,
historyCache: cache,
workflowCache: cache,
metricsHandler: shard.GetMetricsHandler(),
config: config,
archivalClient: archiverClient,
Expand All @@ -111,11 +113,11 @@ func (m *DeleteManagerImpl) AddDeleteWorkflowExecutionTask(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
ms MutableState,
ms workflow.MutableState,
workflowClosedVersion int64,
) error {

taskGenerator := taskGeneratorProvider.NewTaskGenerator(m.shard, ms)
taskGenerator := workflow.NewTaskGeneratorProvider().NewTaskGenerator(m.shard, ms)

// We can make this task immediately because the task itself will keep rescheduling itself until the workflow is
// closed before actually deleting the workflow.
Expand All @@ -141,8 +143,8 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecution(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
weCtx workflow.Context,
ms workflow.MutableState,
forceDeleteFromOpenVisibility bool,
stage *tasks.DeleteWorkflowExecutionStage,
) error {
Expand All @@ -164,8 +166,8 @@ func (m *DeleteManagerImpl) DeleteWorkflowExecutionByRetention(
ctx context.Context,
nsID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
weCtx workflow.Context,
ms workflow.MutableState,
archiveIfEnabled bool,
stage *tasks.DeleteWorkflowExecutionStage,
) error {
Expand All @@ -187,8 +189,8 @@ func (m *DeleteManagerImpl) deleteWorkflowExecutionInternal(
ctx context.Context,
namespaceID namespace.ID,
we commonpb.WorkflowExecution,
weCtx Context,
ms MutableState,
weCtx workflow.Context,
ms workflow.MutableState,
archiveIfEnabled bool,
forceDeleteFromOpenVisibility bool,
stage *tasks.DeleteWorkflowExecutionStage,
Expand Down Expand Up @@ -264,8 +266,8 @@ func (m *DeleteManagerImpl) archiveWorkflowIfEnabled(
namespaceID namespace.ID,
workflowExecution commonpb.WorkflowExecution,
currentBranchToken []byte,
weCtx Context,
ms MutableState,
weCtx workflow.Context,
ms workflow.MutableState,
metricsHandler metrics.MetricsHandler,
) (deletionPromised bool, err error) {

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading