Skip to content

Commit e43e5b5

Browse files
authored
Advanced visibility for MySQL (#3878)
1 parent cf9e84a commit e43e5b5

16 files changed

+2140
-26
lines changed

common/persistence/sql/sqlplugin/interfaces.go

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"context"
2929
"database/sql"
3030

31+
"github.com/jmoiron/sqlx"
3132
"go.temporal.io/server/common/config"
3233
"go.temporal.io/server/common/resolver"
3334
)
@@ -131,5 +132,6 @@ type (
131132
NamedExecContext(ctx context.Context, query string, arg interface{}) (sql.Result, error)
132133
GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
133134
SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
135+
PrepareNamedContext(ctx context.Context, query string) (*sqlx.NamedStmt, error)
134136
}
135137
)

common/persistence/sql/sqlplugin/mysql/typeconv.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (c *converter) ToMySQLDateTime(t time.Time) time.Time {
4545
if t.IsZero() {
4646
return minMySQLDateTime
4747
}
48-
return t.UTC()
48+
return t.UTC().Truncate(time.Microsecond)
4949
}
5050

5151
// FromMySQLDateTime converts mysql datetime and returns go time
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package mysql
26+
27+
import (
28+
"context"
29+
"database/sql"
30+
"errors"
31+
"fmt"
32+
"strings"
33+
34+
"go.temporal.io/server/common/persistence/sql/sqlplugin"
35+
)
36+
37+
var (
38+
templateInsertWorkflowExecution = fmt.Sprintf(
39+
`INSERT INTO executions_visibility (%s)
40+
VALUES (%s)
41+
ON DUPLICATE KEY UPDATE run_id = VALUES(run_id)`,
42+
strings.Join(sqlplugin.DbFields, ", "),
43+
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
44+
)
45+
46+
templateInsertCustomSearchAttributes = `
47+
INSERT INTO custom_search_attributes (
48+
namespace_id, run_id, search_attributes
49+
) VALUES (:namespace_id, :run_id, :search_attributes)
50+
ON DUPLICATE KEY UPDATE run_id = VALUES(run_id)`
51+
52+
templateUpsertWorkflowExecution = fmt.Sprintf(
53+
`INSERT INTO executions_visibility (%s)
54+
VALUES (%s)
55+
%s`,
56+
strings.Join(sqlplugin.DbFields, ", "),
57+
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
58+
buildOnDuplicateKeyUpdate(sqlplugin.DbFields...),
59+
)
60+
61+
templateUpsertCustomSearchAttributes = `
62+
INSERT INTO custom_search_attributes (
63+
namespace_id, run_id, search_attributes
64+
) VALUES (:namespace_id, :run_id, :search_attributes)
65+
ON DUPLICATE KEY UPDATE search_attributes = VALUES(search_attributes)`
66+
67+
templateDeleteWorkflowExecution_v8 = `
68+
DELETE FROM executions_visibility
69+
WHERE namespace_id = :namespace_id AND run_id = :run_id`
70+
71+
templateDeleteCustomSearchAttributes = `
72+
DELETE FROM custom_search_attributes
73+
WHERE namespace_id = :namespace_id AND run_id = :run_id`
74+
75+
templateGetWorkflowExecution_v8 = fmt.Sprintf(
76+
`SELECT %s FROM executions_visibility
77+
WHERE namespace_id = :namespace_id AND run_id = :run_id`,
78+
strings.Join(sqlplugin.DbFields, ", "),
79+
)
80+
)
81+
82+
func buildOnDuplicateKeyUpdate(fields ...string) string {
83+
items := make([]string, len(fields))
84+
for i, field := range fields {
85+
items[i] = fmt.Sprintf("%s = VALUES(%s)", field, field)
86+
}
87+
return fmt.Sprintf("ON DUPLICATE KEY UPDATE %s", strings.Join(items, ", "))
88+
}
89+
90+
// InsertIntoVisibility inserts a row into visibility table. If an row already exist,
91+
// its left as such and no update will be made
92+
func (mdb *dbV8) InsertIntoVisibility(
93+
ctx context.Context,
94+
row *sqlplugin.VisibilityRow,
95+
) (result sql.Result, retError error) {
96+
finalRow := mdb.prepareRowForDB(row)
97+
tx, err := mdb.db.db.BeginTxx(ctx, nil)
98+
if err != nil {
99+
return nil, err
100+
}
101+
defer func() {
102+
err := tx.Rollback()
103+
// If the error is sql.ErrTxDone, it means the transaction already closed, so ignore error.
104+
if err != nil && !errors.Is(err, sql.ErrTxDone) {
105+
// Transaction rollback error should never happen, unless db connection was lost.
106+
retError = fmt.Errorf("transaction rollback failed: %w", retError)
107+
}
108+
}()
109+
result, err = tx.NamedExecContext(ctx, templateInsertWorkflowExecution, finalRow)
110+
if err != nil {
111+
return nil, err
112+
}
113+
_, err = tx.NamedExecContext(ctx, templateInsertCustomSearchAttributes, finalRow)
114+
if err != nil {
115+
return nil, err
116+
}
117+
err = tx.Commit()
118+
if err != nil {
119+
return nil, err
120+
}
121+
return result, nil
122+
}
123+
124+
// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
125+
func (mdb *dbV8) ReplaceIntoVisibility(
126+
ctx context.Context,
127+
row *sqlplugin.VisibilityRow,
128+
) (result sql.Result, retError error) {
129+
finalRow := mdb.prepareRowForDB(row)
130+
tx, err := mdb.db.db.BeginTxx(ctx, nil)
131+
if err != nil {
132+
return nil, err
133+
}
134+
defer func() {
135+
err := tx.Rollback()
136+
// If the error is sql.ErrTxDone, it means the transaction already closed, so ignore error.
137+
if err != nil && !errors.Is(err, sql.ErrTxDone) {
138+
// Transaction rollback error should never happen, unless db connection was lost.
139+
retError = fmt.Errorf("transaction rollback failed: %w", retError)
140+
}
141+
}()
142+
result, err = tx.NamedExecContext(ctx, templateUpsertWorkflowExecution, finalRow)
143+
if err != nil {
144+
return nil, err
145+
}
146+
_, err = tx.NamedExecContext(ctx, templateUpsertCustomSearchAttributes, finalRow)
147+
if err != nil {
148+
return nil, err
149+
}
150+
err = tx.Commit()
151+
if err != nil {
152+
return nil, err
153+
}
154+
return result, nil
155+
}
156+
157+
// DeleteFromVisibility deletes a row from visibility table if it exist
158+
func (mdb *dbV8) DeleteFromVisibility(
159+
ctx context.Context,
160+
filter sqlplugin.VisibilityDeleteFilter,
161+
) (result sql.Result, retError error) {
162+
tx, err := mdb.db.db.BeginTxx(ctx, nil)
163+
if err != nil {
164+
return nil, err
165+
}
166+
defer func() {
167+
err := tx.Rollback()
168+
// If the error is sql.ErrTxDone, it means the transaction already closed, so ignore error.
169+
if err != nil && !errors.Is(err, sql.ErrTxDone) {
170+
// Transaction rollback error should never happen, unless db connection was lost.
171+
retError = fmt.Errorf("transaction rollback failed: %w", retError)
172+
}
173+
}()
174+
_, err = mdb.conn.NamedExecContext(ctx, templateDeleteCustomSearchAttributes, filter)
175+
if err != nil {
176+
return nil, err
177+
}
178+
result, err = mdb.conn.NamedExecContext(ctx, templateDeleteWorkflowExecution_v8, filter)
179+
if err != nil {
180+
return nil, err
181+
}
182+
err = tx.Commit()
183+
if err != nil {
184+
return nil, err
185+
}
186+
return result, nil
187+
}
188+
189+
// SelectFromVisibility reads one or more rows from visibility table
190+
func (mdb *dbV8) SelectFromVisibility(
191+
ctx context.Context,
192+
filter sqlplugin.VisibilitySelectFilter,
193+
) ([]sqlplugin.VisibilityRow, error) {
194+
if len(filter.Query) == 0 {
195+
// backward compatibility for existing tests
196+
err := sqlplugin.GenerateSelectQuery(&filter, mdb.converter.ToMySQLDateTime)
197+
if err != nil {
198+
return nil, err
199+
}
200+
}
201+
202+
var rows []sqlplugin.VisibilityRow
203+
err := mdb.conn.SelectContext(ctx, &rows, filter.Query, filter.QueryArgs...)
204+
if err != nil {
205+
return nil, err
206+
}
207+
for i := range rows {
208+
err = mdb.processRowFromDB(&rows[i])
209+
if err != nil {
210+
return nil, err
211+
}
212+
}
213+
return rows, nil
214+
}
215+
216+
// GetFromVisibility reads one row from visibility table
217+
func (mdb *dbV8) GetFromVisibility(
218+
ctx context.Context,
219+
filter sqlplugin.VisibilityGetFilter,
220+
) (*sqlplugin.VisibilityRow, error) {
221+
var row sqlplugin.VisibilityRow
222+
stmt, err := mdb.conn.PrepareNamedContext(ctx, templateGetWorkflowExecution_v8)
223+
if err != nil {
224+
return nil, err
225+
}
226+
err = stmt.GetContext(ctx, &row, filter)
227+
if err != nil {
228+
return nil, err
229+
}
230+
err = mdb.processRowFromDB(&row)
231+
if err != nil {
232+
return nil, err
233+
}
234+
return &row, nil
235+
}
236+
237+
func (mdb *dbV8) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
238+
if row == nil {
239+
return nil
240+
}
241+
finalRow := *row
242+
finalRow.StartTime = mdb.converter.ToMySQLDateTime(finalRow.StartTime)
243+
finalRow.ExecutionTime = mdb.converter.ToMySQLDateTime(finalRow.ExecutionTime)
244+
if finalRow.CloseTime != nil {
245+
*finalRow.CloseTime = mdb.converter.ToMySQLDateTime(*finalRow.CloseTime)
246+
}
247+
return &finalRow
248+
}
249+
250+
func (mdb *dbV8) processRowFromDB(row *sqlplugin.VisibilityRow) error {
251+
if row == nil {
252+
return nil
253+
}
254+
row.StartTime = mdb.converter.FromMySQLDateTime(row.StartTime)
255+
row.ExecutionTime = mdb.converter.FromMySQLDateTime(row.ExecutionTime)
256+
if row.CloseTime != nil {
257+
closeTime := mdb.converter.FromMySQLDateTime(*row.CloseTime)
258+
row.CloseTime = &closeTime
259+
}
260+
if row.SearchAttributes != nil {
261+
for saName, saValue := range *row.SearchAttributes {
262+
switch typedSaValue := saValue.(type) {
263+
case []interface{}:
264+
// the only valid type is slice of strings
265+
strSlice := make([]string, len(typedSaValue))
266+
for i, item := range typedSaValue {
267+
switch v := item.(type) {
268+
case string:
269+
strSlice[i] = v
270+
default:
271+
return fmt.Errorf("Unexpected data type in keyword list: %T (expected string)", v)
272+
}
273+
}
274+
(*row.SearchAttributes)[saName] = strSlice
275+
default:
276+
// no-op
277+
}
278+
}
279+
}
280+
return nil
281+
}

common/persistence/sql/sqlplugin/tests/visibility.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func (s *visibilitySuite) TestReplaceSelect_Exists() {
289289
s.Equal([]sqlplugin.VisibilityRow{visibility}, rows)
290290
}
291291

292-
func (s *visibilitySuite) TestDeleteSelect() {
292+
func (s *visibilitySuite) TestDeleteGet() {
293293
namespaceID := primitives.NewUUID()
294294
runID := primitives.NewUUID()
295295

@@ -303,15 +303,15 @@ func (s *visibilitySuite) TestDeleteSelect() {
303303
s.NoError(err)
304304
s.Equal(0, int(rowsAffected))
305305

306-
selectFilter := sqlplugin.VisibilitySelectFilter{
306+
getFilter := sqlplugin.VisibilityGetFilter{
307307
NamespaceID: namespaceID.String(),
308-
RunID: convert.StringPtr(runID.String()),
308+
RunID: runID.String(),
309309
}
310-
_, err = s.store.SelectFromVisibility(newVisibilityContext(), selectFilter)
310+
_, err = s.store.GetFromVisibility(newVisibilityContext(), getFilter)
311311
s.Error(err) // TODO persistence layer should do proper error translation
312312
}
313313

314-
func (s *visibilitySuite) TestInsertDeleteSelect() {
314+
func (s *visibilitySuite) TestInsertDeleteGet() {
315315
namespaceID := primitives.NewUUID()
316316
runID := primitives.NewUUID()
317317
workflowTypeName := shuffle.String(testVisibilityWorkflowTypeName)
@@ -349,15 +349,15 @@ func (s *visibilitySuite) TestInsertDeleteSelect() {
349349
s.NoError(err)
350350
s.Equal(1, int(rowsAffected))
351351

352-
selectFilter := sqlplugin.VisibilitySelectFilter{
352+
getFilter := sqlplugin.VisibilityGetFilter{
353353
NamespaceID: namespaceID.String(),
354-
RunID: convert.StringPtr(runID.String()),
354+
RunID: runID.String(),
355355
}
356-
_, err = s.store.SelectFromVisibility(newVisibilityContext(), selectFilter)
356+
_, err = s.store.GetFromVisibility(newVisibilityContext(), getFilter)
357357
s.Error(err) // TODO persistence layer should do proper error translation
358358
}
359359

360-
func (s *visibilitySuite) TestReplaceDeleteSelect() {
360+
func (s *visibilitySuite) TestReplaceDeleteGet() {
361361
namespaceID := primitives.NewUUID()
362362
runID := primitives.NewUUID()
363363
workflowTypeName := shuffle.String(testVisibilityWorkflowTypeName)
@@ -395,11 +395,11 @@ func (s *visibilitySuite) TestReplaceDeleteSelect() {
395395
s.NoError(err)
396396
s.Equal(1, int(rowsAffected))
397397

398-
selectFilter := sqlplugin.VisibilitySelectFilter{
398+
getFilter := sqlplugin.VisibilityGetFilter{
399399
NamespaceID: namespaceID.String(),
400-
RunID: convert.StringPtr(runID.String()),
400+
RunID: runID.String(),
401401
}
402-
_, err = s.store.SelectFromVisibility(newVisibilityContext(), selectFilter)
402+
_, err = s.store.GetFromVisibility(newVisibilityContext(), getFilter)
403403
s.Error(err) // TODO persistence layer should do proper error translation
404404
}
405405

0 commit comments

Comments
 (0)