Skip to content

Commit 8a71b94

Browse files
Add a utility for reliably fetching search attributes and memo (#3687)
1 parent eb88af4 commit 8a71b94

File tree

3 files changed

+267
-17
lines changed

3 files changed

+267
-17
lines changed

service/history/api/describeworkflow/api.go

+14-17
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,21 @@ import (
2929

3030
commonpb "go.temporal.io/api/common/v1"
3131
enumspb "go.temporal.io/api/enums/v1"
32+
"go.temporal.io/api/serviceerror"
3233
taskqueuepb "go.temporal.io/api/taskqueue/v1"
3334
workflowpb "go.temporal.io/api/workflow/v1"
3435

3536
enumsspb "go.temporal.io/server/api/enums/v1"
3637
"go.temporal.io/server/api/historyservice/v1"
3738
"go.temporal.io/server/common"
3839
"go.temporal.io/server/common/definition"
40+
"go.temporal.io/server/common/log/tag"
3941
"go.temporal.io/server/common/namespace"
4042
"go.temporal.io/server/common/persistence/visibility/manager"
4143
"go.temporal.io/server/common/primitives/timestamp"
4244
"go.temporal.io/server/service/history/api"
4345
"go.temporal.io/server/service/history/shard"
46+
"go.temporal.io/server/service/history/workflow"
4447
)
4548

4649
func Invoke(
@@ -192,25 +195,19 @@ func Invoke(
192195
}
193196
}
194197

195-
if executionInfo.CloseVisibilityTaskCompleted {
196-
// If close visibility task has completed, then search attributes and memo
197-
// were removed from mutable state, and we need to fetch from visibility.
198-
visResponse, err := persistenceVisibilityMgr.GetWorkflowExecution(
199-
ctx,
200-
&manager.GetWorkflowExecutionRequest{
201-
NamespaceID: namespaceID,
202-
Namespace: namespace.Name(req.Request.GetNamespace()),
203-
RunID: executionState.RunId,
204-
WorkflowID: executionInfo.WorkflowId,
205-
CloseTime: executionInfo.CloseTime,
206-
},
198+
relocatableAttributes, err := workflow.NewRelocatableAttributesFetcher(persistenceVisibilityMgr).Fetch(ctx, mutableState)
199+
if err != nil {
200+
shard.GetLogger().Error(
201+
"Failed to fetch relocatable attributes",
202+
tag.WorkflowNamespaceID(namespaceID.String()),
203+
tag.WorkflowID(executionInfo.WorkflowId),
204+
tag.WorkflowRunID(executionState.RunId),
205+
tag.Error(err),
207206
)
208-
if err != nil {
209-
return nil, err
210-
}
211-
result.WorkflowExecutionInfo.SearchAttributes = visResponse.Execution.SearchAttributes
212-
result.WorkflowExecutionInfo.Memo = visResponse.Execution.Memo
207+
return nil, serviceerror.NewInternal("Failed to fetch memo and search attributes")
213208
}
209+
result.WorkflowExecutionInfo.Memo = relocatableAttributes.Memo
210+
result.WorkflowExecutionInfo.SearchAttributes = relocatableAttributes.SearchAttributes
214211

215212
return result, nil
216213
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package workflow
26+
27+
import (
28+
"context"
29+
30+
commonpb "go.temporal.io/api/common/v1"
31+
32+
"go.temporal.io/server/common/persistence/visibility/manager"
33+
)
34+
35+
// RelocatableAttributesFetcher is used to fetch the relocatable attributes from the mutable state.
36+
// Relocatable attributes are attributes that can be moved from the mutable state to the persistence backend.
37+
type RelocatableAttributesFetcher interface {
38+
Fetch(
39+
ctx context.Context,
40+
mutableState MutableState,
41+
) (*RelocatableAttributes, error)
42+
}
43+
44+
// NewRelocatableAttributesFetcher creates a new instance of a RelocatableAttributesFetcher.
45+
// The manager.VisibilityManager parameter is used to fetch the relocatable attributes from the persistence backend iff
46+
// we already moved them there out from the mutable state.
47+
// The visibility manager is not used if the relocatable attributes are still in the mutable state.
48+
// We detect that the fields have moved by checking if the CloseExecutionVisibilityTask for this workflow execution is
49+
// marked as complete in the mutable state.
50+
// Because the relocatable fields that we push to persistence are never updated thereafter,
51+
// we may cache them on a per-workflow execution basis.
52+
// Currently, there is no cache, but you may provide a manager.VisibilityManager that supports caching to this function
53+
// safely.
54+
// TODO: Add a cache around the visibility manager for the relocatable attributes.
55+
func NewRelocatableAttributesFetcher(
56+
visibilityManager manager.VisibilityManager,
57+
) RelocatableAttributesFetcher {
58+
return &relocatableAttributesFetcher{
59+
visibilityManager: visibilityManager,
60+
}
61+
}
62+
63+
// RelocatableAttributes contains workflow attributes that can be moved from the mutable state to the persistence
64+
// backend.
65+
type RelocatableAttributes struct {
66+
Memo *commonpb.Memo
67+
SearchAttributes *commonpb.SearchAttributes
68+
}
69+
70+
// relocatableAttributesFetcher is the default implementation of RelocatableAttributesFetcher.
71+
type relocatableAttributesFetcher struct {
72+
visibilityManager manager.VisibilityManager
73+
}
74+
75+
// Fetch fetches the relocatable attributes from the mutable state or the persistence backend.
76+
// First, it checks if the close visibility task is completed. If it is completed, then the relocatable attributes
77+
// are fetched from the persistence backend. Otherwise, the relocatable attributes are fetched from the mutable state.
78+
func (f *relocatableAttributesFetcher) Fetch(
79+
ctx context.Context,
80+
mutableState MutableState,
81+
) (*RelocatableAttributes, error) {
82+
executionInfo := mutableState.GetExecutionInfo()
83+
// If we haven't processed close visibility task yet, then we can fetch the search attributes and memo from the
84+
// mutable state.
85+
if !executionInfo.GetCloseVisibilityTaskCompleted() {
86+
return &RelocatableAttributes{
87+
Memo: &commonpb.Memo{Fields: executionInfo.Memo},
88+
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
89+
}, nil
90+
}
91+
92+
// If we have processed close visibility task, then we need to fetch the search attributes and memo from the
93+
// persistence backend because we have already deleted them from the mutable state.
94+
executionState := mutableState.GetExecutionState()
95+
visResponse, err := f.visibilityManager.GetWorkflowExecution(
96+
ctx,
97+
&manager.GetWorkflowExecutionRequest{
98+
NamespaceID: mutableState.GetNamespaceEntry().ID(),
99+
Namespace: mutableState.GetNamespaceEntry().Name(),
100+
RunID: executionState.GetRunId(),
101+
WorkflowID: executionInfo.GetWorkflowId(),
102+
CloseTime: executionInfo.CloseTime,
103+
},
104+
)
105+
if err != nil {
106+
return nil, err
107+
}
108+
return &RelocatableAttributes{
109+
Memo: visResponse.Execution.Memo,
110+
SearchAttributes: visResponse.Execution.SearchAttributes,
111+
}, nil
112+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package workflow
26+
27+
import (
28+
"context"
29+
"errors"
30+
"testing"
31+
"time"
32+
33+
"github.com/golang/mock/gomock"
34+
"github.com/stretchr/testify/assert"
35+
"github.com/stretchr/testify/require"
36+
"go.temporal.io/api/common/v1"
37+
"go.temporal.io/api/workflow/v1"
38+
39+
"go.temporal.io/server/api/persistence/v1"
40+
"go.temporal.io/server/common/persistence/visibility/manager"
41+
"go.temporal.io/server/service/history/tests"
42+
)
43+
44+
func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
45+
mutableStateAttributes := &RelocatableAttributes{
46+
Memo: &common.Memo{Fields: map[string]*common.Payload{
47+
"memoLocation": {Data: []byte("mutableState")},
48+
}},
49+
SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{
50+
"searchAttributesLocation": {Data: []byte("mutableState")},
51+
}},
52+
}
53+
persistenceAttributes := &RelocatableAttributes{
54+
Memo: &common.Memo{Fields: map[string]*common.Payload{
55+
"memoLocation": {Data: []byte("persistence")},
56+
}},
57+
SearchAttributes: &common.SearchAttributes{IndexedFields: map[string]*common.Payload{
58+
"searchAttributesLocation": {Data: []byte("persistence")},
59+
}},
60+
}
61+
require.NotEqual(t, mutableStateAttributes.Memo, persistenceAttributes.Memo)
62+
require.NotEqual(t, mutableStateAttributes.SearchAttributes, persistenceAttributes.SearchAttributes)
63+
testErr := errors.New("test error")
64+
for _, c := range []*struct {
65+
Name string
66+
CloseVisibilityTaskCompleted bool
67+
GetWorkflowExecutionErr error
68+
69+
ExpectedInfo *RelocatableAttributes
70+
ExpectedErr error
71+
}{
72+
{
73+
Name: "CloseVisibilityTaskNotComplete",
74+
CloseVisibilityTaskCompleted: false,
75+
76+
ExpectedInfo: mutableStateAttributes,
77+
},
78+
{
79+
Name: "CloseVisibilityTaskCompleted",
80+
CloseVisibilityTaskCompleted: true,
81+
82+
ExpectedInfo: persistenceAttributes,
83+
},
84+
{
85+
Name: "GetWorkflowExecutionErr",
86+
CloseVisibilityTaskCompleted: true,
87+
GetWorkflowExecutionErr: testErr,
88+
89+
ExpectedErr: testErr,
90+
},
91+
} {
92+
c := c
93+
t.Run(c.Name, func(t *testing.T) {
94+
t.Parallel()
95+
closeTime := time.Unix(100, 0)
96+
executionInfo := &persistence.WorkflowExecutionInfo{
97+
Memo: mutableStateAttributes.Memo.Fields,
98+
SearchAttributes: mutableStateAttributes.SearchAttributes.IndexedFields,
99+
CloseVisibilityTaskCompleted: c.CloseVisibilityTaskCompleted,
100+
CloseTime: &closeTime,
101+
WorkflowId: tests.WorkflowID,
102+
}
103+
executionState := &persistence.WorkflowExecutionState{
104+
RunId: tests.RunID,
105+
}
106+
namespaceEntry := tests.GlobalNamespaceEntry
107+
ctrl := gomock.NewController(t)
108+
visibilityManager := manager.NewMockVisibilityManager(ctrl)
109+
mutableState := NewMockMutableState(ctrl)
110+
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
111+
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes()
112+
mutableState.EXPECT().GetExecutionState().Return(executionState).AnyTimes()
113+
if c.CloseVisibilityTaskCompleted {
114+
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), &manager.GetWorkflowExecutionRequest{
115+
NamespaceID: namespaceEntry.ID(),
116+
Namespace: namespaceEntry.Name(),
117+
RunID: tests.RunID,
118+
WorkflowID: tests.WorkflowID,
119+
CloseTime: &closeTime,
120+
}).Return(&manager.GetWorkflowExecutionResponse{
121+
Execution: &workflow.WorkflowExecutionInfo{
122+
Memo: persistenceAttributes.Memo,
123+
SearchAttributes: persistenceAttributes.SearchAttributes,
124+
},
125+
}, c.GetWorkflowExecutionErr)
126+
}
127+
ctx := context.Background()
128+
129+
fetcher := NewRelocatableAttributesFetcher(visibilityManager)
130+
info, err := fetcher.Fetch(ctx, mutableState)
131+
132+
if c.ExpectedErr != nil {
133+
require.Error(t, err)
134+
assert.ErrorIs(t, err, c.ExpectedErr)
135+
} else {
136+
require.NoError(t, err)
137+
assert.Equal(t, c.ExpectedInfo, info)
138+
}
139+
})
140+
}
141+
}

0 commit comments

Comments
 (0)