Skip to content

Commit 2117c81

Browse files
authored
Add integration tests for eager workflow start (#3928)
* wip * wip * wip
1 parent 1fb0697 commit 2117c81

File tree

2 files changed

+200
-0
lines changed

2 files changed

+200
-0
lines changed

tests/eager_workflow_start_test.go

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package tests
26+
27+
import (
28+
"fmt"
29+
"time"
30+
31+
"github.com/pborman/uuid"
32+
commandpb "go.temporal.io/api/command/v1"
33+
commonpb "go.temporal.io/api/common/v1"
34+
enumspb "go.temporal.io/api/enums/v1"
35+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
36+
"go.temporal.io/api/workflowservice/v1"
37+
"go.temporal.io/sdk/client"
38+
"go.temporal.io/sdk/converter"
39+
"go.temporal.io/server/common/primitives/timestamp"
40+
)
41+
42+
func (s *integrationSuite) defaultWorkflowID() string {
43+
return fmt.Sprintf("integration-%v", s.T().Name())
44+
}
45+
46+
func (s *integrationSuite) defaultTaskQueue() *taskqueuepb.TaskQueue {
47+
name := fmt.Sprintf("integration-queue-%v", s.T().Name())
48+
return &taskqueuepb.TaskQueue{Name: name, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
49+
}
50+
51+
func (s *integrationSuite) startEagerWorkflow(baseOptions workflowservice.StartWorkflowExecutionRequest) *workflowservice.StartWorkflowExecutionResponse {
52+
options := baseOptions
53+
options.RequestEagerExecution = true
54+
55+
if options.Namespace == "" {
56+
options.Namespace = s.namespace
57+
}
58+
if options.Identity == "" {
59+
options.Identity = "test"
60+
}
61+
if options.WorkflowId == "" {
62+
options.WorkflowId = s.defaultWorkflowID()
63+
}
64+
if options.WorkflowType == nil {
65+
options.WorkflowType = &commonpb.WorkflowType{Name: "Workflow"}
66+
}
67+
if options.TaskQueue == nil {
68+
options.TaskQueue = s.defaultTaskQueue()
69+
}
70+
if options.RequestId == "" {
71+
options.RequestId = uuid.New()
72+
}
73+
74+
response, err := s.engine.StartWorkflowExecution(NewContext(), &options)
75+
s.Require().NoError(err)
76+
77+
return response
78+
}
79+
80+
func (s *integrationSuite) respondWorkflowTaskCompleted(task *workflowservice.PollWorkflowTaskQueueResponse, result interface{}) {
81+
dataConverter := converter.GetDefaultDataConverter()
82+
payloads, err := dataConverter.ToPayloads(result)
83+
s.Require().NoError(err)
84+
completion := workflowservice.RespondWorkflowTaskCompletedRequest{
85+
Namespace: s.namespace,
86+
Identity: "test",
87+
TaskToken: task.TaskToken,
88+
Commands: []*commandpb.Command{{CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
89+
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
90+
Result: payloads,
91+
},
92+
}}},
93+
}
94+
_, err = s.engine.RespondWorkflowTaskCompleted(NewContext(), &completion)
95+
s.Require().NoError(err)
96+
}
97+
98+
func (s *integrationSuite) pollWorkflowTaskQueue() *workflowservice.PollWorkflowTaskQueueResponse {
99+
task, err := s.engine.PollWorkflowTaskQueue(NewContext(), &workflowservice.PollWorkflowTaskQueueRequest{
100+
Namespace: s.namespace,
101+
TaskQueue: s.defaultTaskQueue(),
102+
Identity: "test",
103+
})
104+
s.Require().NotNil(task, "PollWorkflowTaskQueue response was empty")
105+
s.Require().NoError(err)
106+
return task
107+
}
108+
109+
func (s *integrationSuite) getWorkflowStringResult(workflowID, runID string) string {
110+
hostPort := "127.0.0.1:7134"
111+
if TestFlags.FrontendAddr != "" {
112+
hostPort = TestFlags.FrontendAddr
113+
}
114+
c, err := client.Dial(client.Options{HostPort: hostPort, Namespace: s.namespace})
115+
s.Require().NoError(err)
116+
run := c.GetWorkflow(NewContext(), workflowID, runID)
117+
var result string
118+
err = run.Get(NewContext(), &result)
119+
s.Require().NoError(err)
120+
return result
121+
}
122+
123+
func (s *integrationSuite) TestEagerWorkflowStart_StartNew() {
124+
response := s.startEagerWorkflow(workflowservice.StartWorkflowExecutionRequest{})
125+
task := response.GetEagerWorkflowTask()
126+
s.Require().NotNil(task, "StartWorkflowExecution response did not contain a workflow task")
127+
s.respondWorkflowTaskCompleted(task, "ok")
128+
// Verify workflow completes and client can get the result
129+
result := s.getWorkflowStringResult(s.defaultWorkflowID(), response.RunId)
130+
s.Require().Equal("ok", result)
131+
}
132+
133+
func (s *integrationSuite) TestEagerWorkflowStart_RetryTaskAfterTimeout() {
134+
response := s.startEagerWorkflow(workflowservice.StartWorkflowExecutionRequest{
135+
// Should give enough grace time even in slow CI
136+
WorkflowTaskTimeout: timestamp.DurationPtr(2 * time.Second),
137+
})
138+
task := response.GetEagerWorkflowTask()
139+
s.Require().NotNil(task, "StartWorkflowExecution response did not contain a workflow task")
140+
// Let it timeout so it can be polled via standard matching based dispatch
141+
task = s.pollWorkflowTaskQueue()
142+
s.respondWorkflowTaskCompleted(task, "ok")
143+
// Verify workflow completes and client can get the result
144+
result := s.getWorkflowStringResult(s.defaultWorkflowID(), response.RunId)
145+
s.Require().Equal("ok", result)
146+
}
147+
148+
func (s *integrationSuite) TestEagerWorkflowStart_RetryStartAfterTimeout() {
149+
request := workflowservice.StartWorkflowExecutionRequest{
150+
// Should give enough grace time even in slow CI
151+
WorkflowTaskTimeout: timestamp.DurationPtr(2 * time.Second),
152+
RequestId: uuid.New(),
153+
}
154+
response := s.startEagerWorkflow(request)
155+
task := response.GetEagerWorkflowTask()
156+
s.Require().NotNil(task, "StartWorkflowExecution response did not contain a workflow task")
157+
158+
// Let it timeout
159+
time.Sleep(*request.WorkflowTaskTimeout)
160+
response = s.startEagerWorkflow(request)
161+
task = response.GetEagerWorkflowTask()
162+
s.Require().Nil(task, "StartWorkflowExecution response contained a workflow task")
163+
164+
task = s.pollWorkflowTaskQueue()
165+
s.respondWorkflowTaskCompleted(task, "ok")
166+
// Verify workflow completes and client can get the result
167+
result := s.getWorkflowStringResult(s.defaultWorkflowID(), response.RunId)
168+
s.Require().Equal("ok", result)
169+
}
170+
171+
func (s *integrationSuite) TestEagerWorkflowStart_RetryStartImmediately() {
172+
request := workflowservice.StartWorkflowExecutionRequest{RequestId: uuid.New()}
173+
response := s.startEagerWorkflow(request)
174+
task := response.GetEagerWorkflowTask()
175+
s.Require().NotNil(task, "StartWorkflowExecution response did not contain a workflow task")
176+
response = s.startEagerWorkflow(request)
177+
task = response.GetEagerWorkflowTask()
178+
s.Require().NotNil(task, "StartWorkflowExecution response did not contain a workflow task")
179+
180+
s.respondWorkflowTaskCompleted(task, "ok")
181+
// Verify workflow completes and client can get the result
182+
result := s.getWorkflowStringResult(s.defaultWorkflowID(), response.RunId)
183+
s.Require().Equal("ok", result)
184+
}
185+
186+
func (s *integrationSuite) TestEagerWorkflowStart_TerminateDuplicate() {
187+
request := workflowservice.StartWorkflowExecutionRequest{
188+
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING,
189+
}
190+
s.startEagerWorkflow(request)
191+
response := s.startEagerWorkflow(request)
192+
task := response.GetEagerWorkflowTask()
193+
s.Require().NotNil(task, "StartWorkflowExecution response did not contain a workflow task")
194+
195+
s.respondWorkflowTaskCompleted(task, "ok")
196+
// Verify workflow completes and client can get the result
197+
result := s.getWorkflowStringResult(s.defaultWorkflowID(), response.RunId)
198+
s.Require().Equal("ok", result)
199+
}

tests/integration_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type (
5252
func (s *integrationSuite) SetupSuite() {
5353
s.dynamicConfigOverrides = map[dynamicconfig.Key]interface{}{
5454
dynamicconfig.RetentionTimerJitterDuration: time.Second,
55+
dynamicconfig.EnableEagerWorkflowStart: true,
5556
}
5657
s.setupSuite("testdata/integration_test_cluster.yaml")
5758
}

0 commit comments

Comments
 (0)