Skip to content

Commit 52d1ba0

Browse files
authored
Advanced visibility for PostgreSQL (#3896)
1 parent f94e575 commit 52d1ba0

File tree

5 files changed

+439
-2
lines changed

5 files changed

+439
-2
lines changed

common/persistence/sql/sqlplugin/postgresql/typeconv.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (c *converter) ToPostgreSQLDateTime(t time.Time) time.Time {
4545
if t.IsZero() {
4646
return minPostgreSQLDateTime
4747
}
48-
return t.UTC()
48+
return t.UTC().Truncate(time.Microsecond)
4949
}
5050

5151
// FromPostgreSQLDateTime converts postgresql datetime and returns go time
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package postgresql
26+
27+
import (
28+
"context"
29+
"database/sql"
30+
"fmt"
31+
"strings"
32+
33+
"go.temporal.io/server/common/persistence/sql/sqlplugin"
34+
)
35+
36+
var (
37+
templateInsertWorkflowExecution = fmt.Sprintf(
38+
`INSERT INTO executions_visibility (%s)
39+
VALUES (%s)
40+
ON CONFLICT (namespace_id, run_id) DO NOTHING`,
41+
strings.Join(sqlplugin.DbFields, ", "),
42+
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
43+
)
44+
45+
templateUpsertWorkflowExecution = fmt.Sprintf(
46+
`INSERT INTO executions_visibility (%s)
47+
VALUES (%s)
48+
%s`,
49+
strings.Join(sqlplugin.DbFields, ", "),
50+
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
51+
buildOnDuplicateKeyUpdate(sqlplugin.DbFields...),
52+
)
53+
54+
templateDeleteWorkflowExecution_v12 = `
55+
DELETE FROM executions_visibility
56+
WHERE namespace_id = :namespace_id AND run_id = :run_id`
57+
58+
templateGetWorkflowExecution_v12 = fmt.Sprintf(
59+
`SELECT %s FROM executions_visibility
60+
WHERE namespace_id = :namespace_id AND run_id = :run_id`,
61+
strings.Join(sqlplugin.DbFields, ", "),
62+
)
63+
)
64+
65+
func buildOnDuplicateKeyUpdate(fields ...string) string {
66+
items := make([]string, len(fields))
67+
for i, field := range fields {
68+
items[i] = fmt.Sprintf("%s = excluded.%s", field, field)
69+
}
70+
return fmt.Sprintf(
71+
"ON CONFLICT (namespace_id, run_id) DO UPDATE SET %s",
72+
strings.Join(items, ", "),
73+
)
74+
}
75+
76+
// InsertIntoVisibility inserts a row into visibility table. If an row already exist,
77+
// its left as such and no update will be made
78+
func (pdb *dbV12) InsertIntoVisibility(
79+
ctx context.Context,
80+
row *sqlplugin.VisibilityRow,
81+
) (sql.Result, error) {
82+
finalRow := pdb.prepareRowForDB(row)
83+
return pdb.conn.NamedExecContext(ctx, templateInsertWorkflowExecution, finalRow)
84+
}
85+
86+
// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
87+
func (pdb *dbV12) ReplaceIntoVisibility(
88+
ctx context.Context,
89+
row *sqlplugin.VisibilityRow,
90+
) (sql.Result, error) {
91+
finalRow := pdb.prepareRowForDB(row)
92+
return pdb.conn.NamedExecContext(ctx, templateUpsertWorkflowExecution, finalRow)
93+
}
94+
95+
// DeleteFromVisibility deletes a row from visibility table if it exist
96+
func (pdb *dbV12) DeleteFromVisibility(
97+
ctx context.Context,
98+
filter sqlplugin.VisibilityDeleteFilter,
99+
) (sql.Result, error) {
100+
return pdb.conn.NamedExecContext(ctx, templateDeleteWorkflowExecution_v12, filter)
101+
}
102+
103+
// SelectFromVisibility reads one or more rows from visibility table
104+
func (pdb *dbV12) SelectFromVisibility(
105+
ctx context.Context,
106+
filter sqlplugin.VisibilitySelectFilter,
107+
) ([]sqlplugin.VisibilityRow, error) {
108+
if len(filter.Query) == 0 {
109+
// backward compatibility for existing tests
110+
err := sqlplugin.GenerateSelectQuery(&filter, pdb.converter.ToPostgreSQLDateTime)
111+
if err != nil {
112+
return nil, err
113+
}
114+
}
115+
116+
// Rebind will replace default placeholder `?` with the right placeholder for PostgreSQL.
117+
filter.Query = pdb.db.db.Rebind(filter.Query)
118+
var rows []sqlplugin.VisibilityRow
119+
err := pdb.conn.SelectContext(ctx, &rows, filter.Query, filter.QueryArgs...)
120+
if err != nil {
121+
return nil, err
122+
}
123+
for i := range rows {
124+
err = pdb.processRowFromDB(&rows[i])
125+
if err != nil {
126+
return nil, err
127+
}
128+
}
129+
return rows, nil
130+
}
131+
132+
// GetFromVisibility reads one row from visibility table
133+
func (pdb *dbV12) GetFromVisibility(
134+
ctx context.Context,
135+
filter sqlplugin.VisibilityGetFilter,
136+
) (*sqlplugin.VisibilityRow, error) {
137+
var row sqlplugin.VisibilityRow
138+
stmt, err := pdb.conn.PrepareNamedContext(ctx, templateGetWorkflowExecution_v12)
139+
if err != nil {
140+
return nil, err
141+
}
142+
err = stmt.GetContext(ctx, &row, filter)
143+
if err != nil {
144+
return nil, err
145+
}
146+
err = pdb.processRowFromDB(&row)
147+
if err != nil {
148+
return nil, err
149+
}
150+
return &row, nil
151+
}
152+
153+
func (pdb *dbV12) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
154+
if row == nil {
155+
return nil
156+
}
157+
finalRow := *row
158+
finalRow.StartTime = pdb.converter.ToPostgreSQLDateTime(finalRow.StartTime)
159+
finalRow.ExecutionTime = pdb.converter.ToPostgreSQLDateTime(finalRow.ExecutionTime)
160+
if finalRow.CloseTime != nil {
161+
*finalRow.CloseTime = pdb.converter.ToPostgreSQLDateTime(*finalRow.CloseTime)
162+
}
163+
return &finalRow
164+
}
165+
166+
func (pdb *dbV12) processRowFromDB(row *sqlplugin.VisibilityRow) error {
167+
if row == nil {
168+
return nil
169+
}
170+
row.StartTime = pdb.converter.FromPostgreSQLDateTime(row.StartTime)
171+
row.ExecutionTime = pdb.converter.FromPostgreSQLDateTime(row.ExecutionTime)
172+
if row.CloseTime != nil {
173+
closeTime := pdb.converter.FromPostgreSQLDateTime(*row.CloseTime)
174+
row.CloseTime = &closeTime
175+
}
176+
if row.SearchAttributes != nil {
177+
for saName, saValue := range *row.SearchAttributes {
178+
switch typedSaValue := saValue.(type) {
179+
case []interface{}:
180+
// the only valid type is slice of strings
181+
strSlice := make([]string, len(typedSaValue))
182+
for i, item := range typedSaValue {
183+
switch v := item.(type) {
184+
case string:
185+
strSlice[i] = v
186+
default:
187+
return fmt.Errorf("Unexpected data type in keyword list: %T (expected string)", v)
188+
}
189+
}
190+
(*row.SearchAttributes)[saName] = strSlice
191+
default:
192+
// no-op
193+
}
194+
}
195+
}
196+
// need to trim the run ID, or otherwise the returned value will
197+
// come with lots of trailing spaces, probably due to the CHAR(64) type
198+
row.RunID = strings.TrimSpace(row.RunID)
199+
return nil
200+
}

common/persistence/visibility/factory.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"go.temporal.io/server/common/log"
3131
"go.temporal.io/server/common/metrics"
3232
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
33+
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
3334
"go.temporal.io/server/common/persistence/visibility/manager"
3435
"go.temporal.io/server/common/persistence/visibility/store"
3536
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
@@ -282,7 +283,7 @@ func newStandardVisibilityStore(
282283
isStandard = true
283284
case visibilityStoreCfg.SQL != nil:
284285
switch visibilityStoreCfg.SQL.PluginName {
285-
case mysql.PluginNameV8:
286+
case mysql.PluginNameV8, postgresql.PluginNameV12:
286287
isStandard = false
287288
visStore, err = sql.NewSQLVisibilityStore(
288289
*visibilityStoreCfg.SQL,

common/persistence/visibility/store/sql/query_converter_factory.go

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package sql
2626

2727
import (
2828
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
29+
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
2930
"go.temporal.io/server/common/persistence/visibility/manager"
3031
"go.temporal.io/server/common/searchattribute"
3132
)
@@ -39,6 +40,8 @@ func NewQueryConverter(
3940
switch pluginName {
4041
case mysql.PluginNameV8:
4142
return newMySQLQueryConverter(request, saTypeMap, saMapper)
43+
case postgresql.PluginNameV12:
44+
return newPostgreSQLQueryConverter(request, saTypeMap, saMapper)
4245
default:
4346
return nil
4447
}

0 commit comments

Comments
 (0)