Skip to content

Commit 099cef3

Browse files
authored
Support CountWorkflowExecutions for SQL DB (#3955)
1 parent f4e4ea2 commit 099cef3

13 files changed

+229
-35
lines changed

common/persistence/sql/sqlplugin/mysql/visibility.go

+8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"fmt"
3232

3333
"go.temporal.io/server/common/persistence/sql/sqlplugin"
34+
"go.temporal.io/server/common/persistence/visibility/store"
3435
)
3536

3637
const (
@@ -297,6 +298,13 @@ func (mdb *db) GetFromVisibility(
297298
return &row, nil
298299
}
299300

301+
func (mdb *db) CountFromVisibility(
302+
ctx context.Context,
303+
filter sqlplugin.VisibilitySelectFilter,
304+
) (int64, error) {
305+
return 0, store.OperationNotSupportedErr
306+
}
307+
300308
func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
301309
row.StartTime = mdb.converter.FromMySQLDateTime(row.StartTime)
302310
row.ExecutionTime = mdb.converter.FromMySQLDateTime(row.ExecutionTime)

common/persistence/sql/sqlplugin/mysql/visibility_v8.go

+12
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,18 @@ func (mdb *dbV8) GetFromVisibility(
234234
return &row, nil
235235
}
236236

237+
func (mdb *dbV8) CountFromVisibility(
238+
ctx context.Context,
239+
filter sqlplugin.VisibilitySelectFilter,
240+
) (int64, error) {
241+
var count int64
242+
err := mdb.conn.GetContext(ctx, &count, filter.Query, filter.QueryArgs...)
243+
if err != nil {
244+
return 0, err
245+
}
246+
return count, nil
247+
}
248+
237249
func (mdb *dbV8) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
238250
if row == nil {
239251
return nil

common/persistence/sql/sqlplugin/postgresql/visibility.go

+8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"strings"
3333

3434
"go.temporal.io/server/common/persistence/sql/sqlplugin"
35+
"go.temporal.io/server/common/persistence/visibility/store"
3536
)
3637

3738
const (
@@ -316,6 +317,13 @@ func (pdb *db) GetFromVisibility(
316317
return &row, nil
317318
}
318319

320+
func (pdb *db) CountFromVisibility(
321+
ctx context.Context,
322+
filter sqlplugin.VisibilitySelectFilter,
323+
) (int64, error) {
324+
return 0, store.OperationNotSupportedErr
325+
}
326+
319327
func (pdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
320328
row.StartTime = pdb.converter.FromPostgreSQLDateTime(row.StartTime)
321329
row.ExecutionTime = pdb.converter.FromPostgreSQLDateTime(row.ExecutionTime)

common/persistence/sql/sqlplugin/postgresql/visibility_v12.go

+13
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,19 @@ func (pdb *dbV12) GetFromVisibility(
150150
return &row, nil
151151
}
152152

153+
func (pdb *dbV12) CountFromVisibility(
154+
ctx context.Context,
155+
filter sqlplugin.VisibilitySelectFilter,
156+
) (int64, error) {
157+
var count int64
158+
filter.Query = pdb.db.db.Rebind(filter.Query)
159+
err := pdb.conn.GetContext(ctx, &count, filter.Query, filter.QueryArgs...)
160+
if err != nil {
161+
return 0, err
162+
}
163+
return count, nil
164+
}
165+
153166
func (pdb *dbV12) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
154167
if row == nil {
155168
return nil

common/persistence/sql/sqlplugin/sqlite/visibility.go

+12
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,18 @@ func (mdb *db) GetFromVisibility(
152152
return &row, nil
153153
}
154154

155+
func (mdb *db) CountFromVisibility(
156+
ctx context.Context,
157+
filter sqlplugin.VisibilitySelectFilter,
158+
) (int64, error) {
159+
var count int64
160+
err := mdb.conn.GetContext(ctx, &count, filter.Query, filter.QueryArgs...)
161+
if err != nil {
162+
return 0, err
163+
}
164+
return count, nil
165+
}
166+
155167
func (mdb *db) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
156168
if row == nil {
157169
return nil

common/persistence/sql/sqlplugin/visibility.go

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ type (
105105
SelectFromVisibility(ctx context.Context, filter VisibilitySelectFilter) ([]VisibilityRow, error)
106106
GetFromVisibility(ctx context.Context, filter VisibilityGetFilter) (*VisibilityRow, error)
107107
DeleteFromVisibility(ctx context.Context, filter VisibilityDeleteFilter) (sql.Result, error)
108+
CountFromVisibility(ctx context.Context, filter VisibilitySelectFilter) (int64, error)
108109
}
109110
)
110111

common/persistence/visibility/store/sql/query_converter.go

+34-12
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
enumspb "go.temporal.io/api/enums/v1"
3737
"go.temporal.io/server/common/namespace"
3838
"go.temporal.io/server/common/persistence/sql/sqlplugin"
39-
"go.temporal.io/server/common/persistence/visibility/manager"
4039
"go.temporal.io/server/common/persistence/visibility/store/query"
4140
"go.temporal.io/server/common/searchattribute"
4241
)
@@ -54,16 +53,20 @@ type (
5453
token *pageToken,
5554
) (string, []any)
5655

56+
buildCountStmt(namespaceID namespace.ID, queryString string) (string, []any)
57+
5758
getDatetimeFormat() string
5859

5960
getCoalesceCloseTimeExpr() sqlparser.Expr
6061
}
6162

6263
QueryConverter struct {
6364
pluginQueryConverter
64-
request *manager.ListWorkflowExecutionsRequestV2
65-
saTypeMap searchattribute.NameTypeMap
66-
saMapper searchattribute.Mapper
65+
namespaceName namespace.Name
66+
namespaceID namespace.ID
67+
saTypeMap searchattribute.NameTypeMap
68+
saMapper searchattribute.Mapper
69+
queryString string
6770

6871
seenNamespaceDivision bool
6972
}
@@ -114,38 +117,57 @@ var (
114117

115118
func newQueryConverterInternal(
116119
pqc pluginQueryConverter,
117-
request *manager.ListWorkflowExecutionsRequestV2,
120+
namespaceName namespace.Name,
121+
namespaceID namespace.ID,
118122
saTypeMap searchattribute.NameTypeMap,
119123
saMapper searchattribute.Mapper,
124+
queryString string,
120125
) *QueryConverter {
121126
return &QueryConverter{
122127
pluginQueryConverter: pqc,
123-
request: request,
128+
namespaceName: namespaceName,
129+
namespaceID: namespaceID,
124130
saTypeMap: saTypeMap,
125131
saMapper: saMapper,
132+
queryString: queryString,
126133

127134
seenNamespaceDivision: false,
128135
}
129136
}
130137

131-
func (c *QueryConverter) BuildSelectStmt() (*sqlplugin.VisibilitySelectFilter, error) {
132-
token, err := deserializePageToken(c.request.NextPageToken)
138+
func (c *QueryConverter) BuildSelectStmt(
139+
pageSize int,
140+
nextPageToken []byte,
141+
) (*sqlplugin.VisibilitySelectFilter, error) {
142+
token, err := deserializePageToken(nextPageToken)
133143
if err != nil {
134144
return nil, err
135145
}
136-
queryString, err := c.convertWhereString(c.request.Query)
146+
queryString, err := c.convertWhereString(c.queryString)
137147
if err != nil {
138148
return nil, err
139149
}
140150
queryString, queryArgs := c.buildSelectStmt(
141-
c.request.NamespaceID,
151+
c.namespaceID,
142152
queryString,
143-
c.request.PageSize,
153+
pageSize,
144154
token,
145155
)
146156
return &sqlplugin.VisibilitySelectFilter{Query: queryString, QueryArgs: queryArgs}, nil
147157
}
148158

159+
func (c *QueryConverter) BuildCountStmt() (*sqlplugin.VisibilitySelectFilter, error) {
160+
queryString, err := c.convertWhereString(c.queryString)
161+
if err != nil {
162+
return nil, err
163+
}
164+
queryString, queryArgs := c.buildCountStmt(
165+
c.namespaceID,
166+
queryString,
167+
)
168+
return &sqlplugin.VisibilitySelectFilter{Query: queryString, QueryArgs: queryArgs}, nil
169+
}
170+
149171
func (c *QueryConverter) convertWhereString(queryString string) (string, error) {
150172
where := strings.TrimSpace(queryString)
151173
if where != "" && !strings.HasPrefix(strings.ToLower(where), "order by") {
@@ -376,7 +398,7 @@ func (c *QueryConverter) convertColName(
376398
saFieldName = saAlias
377399
if searchattribute.IsMappable(saAlias) {
378400
var err error
379-
saFieldName, err = c.saMapper.GetFieldName(saAlias, c.request.Namespace.String())
401+
saFieldName, err = c.saMapper.GetFieldName(saAlias, c.namespaceName.String())
380402
if err != nil {
381403
return "", "", err
382404
}

common/persistence/visibility/store/sql/query_converter_factory.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,28 @@
2525
package sql
2626

2727
import (
28+
"go.temporal.io/server/common/namespace"
2829
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
2930
"go.temporal.io/server/common/persistence/sql/sqlplugin/postgresql"
3031
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
31-
"go.temporal.io/server/common/persistence/visibility/manager"
3232
"go.temporal.io/server/common/searchattribute"
3333
)
3434

3535
func NewQueryConverter(
3636
pluginName string,
37-
request *manager.ListWorkflowExecutionsRequestV2,
37+
namespaceName namespace.Name,
38+
namespaceID namespace.ID,
3839
saTypeMap searchattribute.NameTypeMap,
3940
saMapper searchattribute.Mapper,
41+
queryString string,
4042
) *QueryConverter {
4143
switch pluginName {
4244
case mysql.PluginNameV8:
43-
return newMySQLQueryConverter(request, saTypeMap, saMapper)
45+
return newMySQLQueryConverter(namespaceName, namespaceID, saTypeMap, saMapper, queryString)
4446
case postgresql.PluginNameV12:
45-
return newPostgreSQLQueryConverter(request, saTypeMap, saMapper)
47+
return newPostgreSQLQueryConverter(namespaceName, namespaceID, saTypeMap, saMapper, queryString)
4648
case sqlite.PluginName:
47-
return newSqliteQueryConverter(request, saTypeMap, saMapper)
49+
return newSqliteQueryConverter(namespaceName, namespaceID, saTypeMap, saMapper, queryString)
4850
default:
4951
return nil
5052
}

common/persistence/visibility/store/sql/query_converter_mysql.go

+35-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"github.com/xwb1989/sqlparser"
3333
"go.temporal.io/server/common/namespace"
3434
"go.temporal.io/server/common/persistence/sql/sqlplugin"
35-
"go.temporal.io/server/common/persistence/visibility/manager"
3635
"go.temporal.io/server/common/persistence/visibility/store/query"
3736
"go.temporal.io/server/common/searchattribute"
3837
)
@@ -83,15 +82,19 @@ func (node *jsonOverlapsExpr) Format(buf *sqlparser.TrackedBuffer) {
8382
}
8483

8584
func newMySQLQueryConverter(
86-
request *manager.ListWorkflowExecutionsRequestV2,
85+
namespaceName namespace.Name,
86+
namespaceID namespace.ID,
8787
saTypeMap searchattribute.NameTypeMap,
8888
saMapper searchattribute.Mapper,
89+
queryString string,
8990
) *QueryConverter {
9091
return newQueryConverterInternal(
9192
&mysqlQueryConverter{},
92-
request,
93+
namespaceName,
94+
namespaceID,
9395
saTypeMap,
9496
saMapper,
97+
queryString,
9598
)
9699
}
97100

@@ -253,3 +256,32 @@ func (c *mysqlQueryConverter) buildSelectStmt(
253256
searchattribute.GetSqlDbColName(searchattribute.RunID),
254257
), queryArgs
255258
}
259+
260+
func (c *mysqlQueryConverter) buildCountStmt(
261+
namespaceID namespace.ID,
262+
queryString string,
263+
) (string, []any) {
264+
var whereClauses []string
265+
var queryArgs []any
266+
267+
whereClauses = append(
268+
whereClauses,
269+
fmt.Sprintf("(%s = ?)", searchattribute.GetSqlDbColName(searchattribute.NamespaceID)),
270+
)
271+
queryArgs = append(queryArgs, namespaceID.String())
272+
273+
if len(queryString) > 0 {
274+
whereClauses = append(whereClauses, fmt.Sprintf("(%s)", queryString))
275+
}
276+
277+
return fmt.Sprintf(
278+
`SELECT COUNT(1)
279+
FROM executions_visibility ev
280+
LEFT JOIN custom_search_attributes
281+
USING (%s, %s)
282+
WHERE %s`,
283+
searchattribute.GetSqlDbColName(searchattribute.NamespaceID),
284+
searchattribute.GetSqlDbColName(searchattribute.RunID),
285+
strings.Join(whereClauses, " AND "),
286+
), queryArgs
287+
}

common/persistence/visibility/store/sql/query_converter_postgresql.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/xwb1989/sqlparser"
3232
"go.temporal.io/server/common/namespace"
3333
"go.temporal.io/server/common/persistence/sql/sqlplugin"
34-
"go.temporal.io/server/common/persistence/visibility/manager"
3534
"go.temporal.io/server/common/persistence/visibility/store/query"
3635
"go.temporal.io/server/common/searchattribute"
3736
)
@@ -64,15 +63,19 @@ func (node *pgCastExpr) Format(buf *sqlparser.TrackedBuffer) {
6463
}
6564

6665
func newPostgreSQLQueryConverter(
67-
request *manager.ListWorkflowExecutionsRequestV2,
66+
namespaceName namespace.Name,
67+
namespaceID namespace.ID,
6868
saTypeMap searchattribute.NameTypeMap,
6969
saMapper searchattribute.Mapper,
70+
queryString string,
7071
) *QueryConverter {
7172
return newQueryConverterInternal(
7273
&pgQueryConverter{},
73-
request,
74+
namespaceName,
75+
namespaceID,
7476
saTypeMap,
7577
saMapper,
78+
queryString,
7679
)
7780
}
7881

@@ -256,3 +259,26 @@ func (c *pgQueryConverter) buildSelectStmt(
256259
searchattribute.GetSqlDbColName(searchattribute.RunID),
257260
), queryArgs
258261
}
262+
263+
func (c *pgQueryConverter) buildCountStmt(
264+
namespaceID namespace.ID,
265+
queryString string,
266+
) (string, []any) {
267+
var whereClauses []string
268+
var queryArgs []any
269+
270+
whereClauses = append(
271+
whereClauses,
272+
fmt.Sprintf("(%s = ?)", searchattribute.GetSqlDbColName(searchattribute.NamespaceID)),
273+
)
274+
queryArgs = append(queryArgs, namespaceID.String())
275+
276+
if len(queryString) > 0 {
277+
whereClauses = append(whereClauses, fmt.Sprintf("(%s)", queryString))
278+
}
279+
280+
return fmt.Sprintf(
281+
"SELECT COUNT(1) FROM executions_visibility WHERE %s",
282+
strings.Join(whereClauses, " AND "),
283+
), queryArgs
284+
}

0 commit comments

Comments
 (0)