Skip to content

Commit 391ed61

Browse files
authored
Factor out task queue name mangling into new package (#3549)
1 parent de6f2df commit 391ed61

15 files changed

+418
-249
lines changed

client/matching/loadbalancer.go

+8-20
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@
2525
package matching
2626

2727
import (
28-
"fmt"
2928
"math/rand"
30-
"strings"
3129

3230
enumspb "go.temporal.io/api/enums/v1"
3331
taskqueuepb "go.temporal.io/api/taskqueue/v1"
3432

3533
"go.temporal.io/server/common/dynamicconfig"
3634
"go.temporal.io/server/common/namespace"
35+
"go.temporal.io/server/common/tqname"
36+
"go.temporal.io/server/common/util"
3737
)
3838

3939
type (
@@ -72,10 +72,6 @@ type (
7272
}
7373
)
7474

75-
const (
76-
taskQueuePartitionPrefix = "/_sys/"
77-
)
78-
7975
// NewLoadBalancer returns an instance of matching load balancer that
8076
// can help distribute api calls across task queue partitions
8177
func NewLoadBalancer(
@@ -114,30 +110,22 @@ func (lb *defaultLoadBalancer) pickPartition(
114110
forwardedFrom string,
115111
nPartitions dynamicconfig.IntPropertyFnWithTaskQueueInfoFilters,
116112
) string {
117-
118113
if forwardedFrom != "" || taskQueue.GetKind() == enumspb.TASK_QUEUE_KIND_STICKY {
119114
return taskQueue.GetName()
120115
}
121116

122-
if strings.HasPrefix(taskQueue.GetName(), taskQueuePartitionPrefix) {
123-
// this should never happen when forwardedFrom is empty
124-
return taskQueue.GetName()
125-
}
117+
tqName, err := tqname.FromBaseName(taskQueue.GetName())
126118

127-
namespace, err := lb.namespaceIDToName(namespaceID)
119+
// this should never happen when forwardedFrom is empty
128120
if err != nil {
129121
return taskQueue.GetName()
130122
}
131123

132-
n := nPartitions(namespace.String(), taskQueue.GetName(), taskQueueType)
133-
if n <= 0 {
134-
return taskQueue.GetName()
135-
}
136-
137-
p := rand.Intn(n)
138-
if p == 0 {
124+
nsName, err := lb.namespaceIDToName(namespaceID)
125+
if err != nil {
139126
return taskQueue.GetName()
140127
}
141128

142-
return fmt.Sprintf("%v%v/%v", taskQueuePartitionPrefix, taskQueue.GetName(), p)
129+
n := util.Min(1, nPartitions(nsName.String(), tqName.BaseNameString(), taskQueueType))
130+
return tqName.WithPartition(rand.Intn(n)).FullName()
143131
}

client/matching/metric_client.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,19 @@ package matching
2626

2727
import (
2828
"context"
29-
"strings"
3029
"time"
3130

3231
"go.temporal.io/api/serviceerror"
3332
taskqueuepb "go.temporal.io/api/taskqueue/v1"
3433
"google.golang.org/grpc"
3534

36-
"go.temporal.io/server/common/headers"
37-
serviceerrors "go.temporal.io/server/common/serviceerror"
38-
3935
"go.temporal.io/server/api/matchingservice/v1"
36+
"go.temporal.io/server/common/headers"
4037
"go.temporal.io/server/common/log"
4138
"go.temporal.io/server/common/log/tag"
4239
"go.temporal.io/server/common/metrics"
40+
serviceerrors "go.temporal.io/server/common/serviceerror"
41+
"go.temporal.io/server/common/tqname"
4342
)
4443

4544
var _ matchingservice.MatchingServiceClient = (*metricClient)(nil)
@@ -179,12 +178,12 @@ func (c *metricClient) emitForwardedSourceStats(
179178
return
180179
}
181180

182-
isChildPartition := strings.HasPrefix(taskQueue.GetName(), taskQueuePartitionPrefix)
183181
switch {
184182
case forwardedFrom != "":
185183
metricsHandler.Counter(metrics.MatchingClientForwardedCounter.GetMetricName()).Record(1)
186184
default:
187-
if isChildPartition {
185+
_, err := tqname.FromBaseName(taskQueue.GetName())
186+
if err != nil {
188187
metricsHandler.Counter(metrics.MatchingClientInvalidTaskQueueName.GetMetricName()).Record(1)
189188
}
190189
}

common/tqname/tqname.go

+190
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package tqname
26+
27+
import (
28+
"errors"
29+
"fmt"
30+
"strconv"
31+
"strings"
32+
)
33+
34+
const (
35+
// mangledTaskQueuePrefix is the prefix for all mangled task queue names.
36+
mangledTaskQueuePrefix = "/_sys/"
37+
38+
suffixDelimiter = "/"
39+
versionSetDelimiter = ":"
40+
)
41+
42+
type (
43+
// Name encapsulates all the name mangling we do for task queues.
44+
//
45+
// Users work with "high-level task queues" and can give them arbitrary names (except for
46+
// our prefix).
47+
//
48+
// Each high-level task queue corresponds to one or more "low-level task queues", each of
49+
// which has a distinct task queue manager in memory in matching service, as well as a
50+
// distinct identity in persistence.
51+
//
52+
// There are two pieces of identifying information for low-level task queues: a partition
53+
// index, and a version set identifier. All low-level task queues have a partition index,
54+
// which may be 0. Partition 0 is called the "root". The version set identifier is
55+
// optional: task queues with it are called "versioned" and task queues without it are
56+
// called "unversioned".
57+
//
58+
// All versioned low-level task queues use mangled names. All unversioned low-level task
59+
// queues with non-zero partition also use mangled names.
60+
//
61+
// Mangled names look like this:
62+
//
63+
// /_sys/<base name>/[<version set id>:]<partition id>
64+
//
65+
// The partition id is required, and the version set id is optional. If present, it is
66+
// separated from the partition id by a colon. This scheme lets users use anything they
67+
// like for a base name, except for strings starting with "/_sys/", without ambiguity.
68+
//
69+
// For backward compatibility, unversioned low-level task queues with partition 0 do not
70+
// use mangled names, they use the bare base name.
71+
Name struct {
72+
baseName string // base name of the task queue as specified by user
73+
partition int // partition of task queue
74+
versionSet string // version set id
75+
}
76+
)
77+
78+
var (
79+
ErrNoParent = errors.New("root task queue partition has no parent")
80+
ErrInvalidDegree = errors.New("invalid task queue partition branching degree")
81+
)
82+
83+
// Parse takes a mangled low-level task queue name and returns a Name. Returns an error if the
84+
// given name is not a valid mangled name.
85+
func Parse(name string) (Name, error) {
86+
baseName := name
87+
partition := 0
88+
versionSet := ""
89+
90+
if strings.HasPrefix(name, mangledTaskQueuePrefix) {
91+
suffixOff := strings.LastIndex(name, suffixDelimiter)
92+
if suffixOff <= len(mangledTaskQueuePrefix) {
93+
return Name{}, fmt.Errorf("invalid task queue name %q", name)
94+
}
95+
baseName = name[len(mangledTaskQueuePrefix):suffixOff]
96+
97+
suffix := name[suffixOff+1:]
98+
if partitionOff := strings.LastIndex(suffix, versionSetDelimiter); partitionOff == 0 {
99+
return Name{}, fmt.Errorf("invalid task queue name %q", name)
100+
} else if partitionOff > 0 {
101+
// pull out version set
102+
versionSet, suffix = suffix[:partitionOff], suffix[partitionOff+1:]
103+
}
104+
105+
var err error
106+
partition, err = strconv.Atoi(suffix)
107+
if err != nil || partition < 0 || partition == 0 && len(versionSet) == 0 {
108+
return Name{}, fmt.Errorf("invalid task queue name %q", name)
109+
}
110+
}
111+
112+
return Name{
113+
baseName: baseName,
114+
partition: partition,
115+
versionSet: versionSet,
116+
}, nil
117+
}
118+
119+
// FromBaseName takes a base name and returns a Name. Returns an error if name looks like a
120+
// mangled name.
121+
func FromBaseName(name string) (Name, error) {
122+
if strings.HasPrefix(name, mangledTaskQueuePrefix) {
123+
return Name{}, fmt.Errorf("base name %q must not have prefix %q", name, mangledTaskQueuePrefix)
124+
}
125+
return Name{baseName: name}, nil
126+
}
127+
128+
// IsRoot returns true if this task queue is a root partition.
129+
func (tn Name) IsRoot() bool {
130+
return tn.partition == 0
131+
}
132+
133+
// WithPartition returns a new Name with the same base and version set but a different partition.
134+
func (tn Name) WithPartition(partition int) Name {
135+
nn := tn
136+
nn.partition = partition
137+
return nn
138+
}
139+
140+
// Root is shorthand for WithPartition(0).
141+
func (tn Name) Root() Name {
142+
return tn.WithPartition(0)
143+
}
144+
145+
// WithVersionSet returns a new Name with the same base and partition but a different version set.
146+
func (tn Name) WithVersionSet(versionSet string) Name {
147+
nn := tn
148+
nn.versionSet = versionSet
149+
return nn
150+
}
151+
152+
// BaseNameString returns the base name for a task queue. This should be used when looking up
153+
// settings in dynamic config, and pretty much nowhere else. To get the name of the root
154+
// partition, use tn.Root().FullName().
155+
func (tn Name) BaseNameString() string {
156+
return tn.baseName
157+
}
158+
159+
// Partition returns the partition number for a task queue.
160+
func (tn Name) Partition() int {
161+
return tn.partition
162+
}
163+
164+
// VersionSet returns the version set for a task queue.
165+
func (tn Name) VersionSet() string {
166+
return tn.versionSet
167+
}
168+
169+
// Parent returns a Name for the parent partition, using the given branching degree.
170+
func (tn Name) Parent(degree int) (Name, error) {
171+
if tn.IsRoot() {
172+
return Name{}, ErrNoParent
173+
} else if degree < 1 {
174+
return Name{}, ErrInvalidDegree
175+
}
176+
parent := (tn.partition+degree-1)/degree - 1
177+
return tn.WithPartition(parent), nil
178+
}
179+
180+
// FullName returns the mangled name of the task queue, to be used in RPCs and persistence.
181+
func (tn Name) FullName() string {
182+
if len(tn.versionSet) == 0 {
183+
if tn.partition == 0 {
184+
return tn.baseName
185+
}
186+
return fmt.Sprintf("%s%s%s%d", mangledTaskQueuePrefix, tn.baseName, suffixDelimiter, tn.partition)
187+
}
188+
// versioned always use prefix
189+
return fmt.Sprintf("%s%s%s%s%s%d", mangledTaskQueuePrefix, tn.baseName, suffixDelimiter, tn.versionSet, versionSetDelimiter, tn.partition)
190+
}

0 commit comments

Comments
 (0)