Skip to content

Commit 1f85fdc

Browse files
authored
feat(pubsublite): allow increasing the number of topic partitions (#3647)
Allows increasing the number of partitions when updating a topic config. Updates documentation, examples and integration tests.
1 parent 0da3578 commit 1f85fdc

7 files changed

+73
-55
lines changed

pubsublite/config.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ type TopicConfig struct {
4646
// about valid topic IDs.
4747
Name string
4848

49-
// The number of partitions in the topic. Must be at least 1. Cannot be
50-
// changed after creation.
49+
// The number of partitions in the topic. Must be at least 1. Can be increased
50+
// after creation, but not decreased.
5151
PartitionCount int
5252

5353
// Publish throughput capacity per partition in MiB/s.
@@ -120,6 +120,10 @@ type TopicConfigToUpdate struct {
120120
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". Required.
121121
Name string
122122

123+
// If non-zero, will update the number of partitions in the topic. The number
124+
// of partitions can only be increased, not decreased.
125+
PartitionCount int
126+
123127
// If non-zero, will update the publish throughput capacity per partition.
124128
PublishCapacityMiBPerSec int
125129

@@ -139,6 +143,7 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
139143
updatedTopic := &pb.Topic{
140144
Name: tc.Name,
141145
PartitionConfig: &pb.Topic_PartitionConfig{
146+
Count: int64(tc.PartitionCount),
142147
Dimension: &pb.Topic_PartitionConfig_Capacity_{
143148
Capacity: &pb.Topic_PartitionConfig_Capacity{
144149
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
@@ -152,6 +157,9 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
152157
}
153158

154159
var fields []string
160+
if tc.PartitionCount > 0 {
161+
fields = append(fields, "partition_config.count")
162+
}
155163
if tc.PublishCapacityMiBPerSec > 0 {
156164
fields = append(fields, "partition_config.capacity.publish_mib_per_sec")
157165
}

pubsublite/config_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ func TestTopicUpdateRequest(t *testing.T) {
112112
desc: "all fields set",
113113
config: &TopicConfigToUpdate{
114114
Name: "projects/my-proj/locations/us-central1-c/topics/my-topic",
115+
PartitionCount: 2,
115116
PublishCapacityMiBPerSec: 4,
116117
SubscribeCapacityMiBPerSec: 12,
117118
PerPartitionBytes: 500000,
@@ -121,6 +122,7 @@ func TestTopicUpdateRequest(t *testing.T) {
121122
Topic: &pb.Topic{
122123
Name: "projects/my-proj/locations/us-central1-c/topics/my-topic",
123124
PartitionConfig: &pb.Topic_PartitionConfig{
125+
Count: 2,
124126
Dimension: &pb.Topic_PartitionConfig_Capacity_{
125127
Capacity: &pb.Topic_PartitionConfig_Capacity{
126128
PublishMibPerSec: 4,
@@ -135,6 +137,7 @@ func TestTopicUpdateRequest(t *testing.T) {
135137
},
136138
UpdateMask: &fmpb.FieldMask{
137139
Paths: []string{
140+
"partition_config.count",
138141
"partition_config.capacity.publish_mib_per_sec",
139142
"partition_config.capacity.subscribe_mib_per_sec",
140143
"retention_config.per_partition_bytes",
@@ -187,7 +190,7 @@ func TestTopicUpdateRequest(t *testing.T) {
187190
} {
188191
t.Run(tc.desc, func(t *testing.T) {
189192
if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) {
190-
t.Errorf("TopicConfigToUpdate: %v toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want)
193+
t.Errorf("TopicConfigToUpdate(%v).toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want)
191194
}
192195
})
193196
}

pubsublite/example_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ func ExampleAdminClient_UpdateTopic() {
6161

6262
updateConfig := pubsublite.TopicConfigToUpdate{
6363
Name: "projects/my-project/locations/zone/topics/my-topic",
64+
PartitionCount: 3, // Only increases currently supported.
6465
PublishCapacityMiBPerSec: 8,
6566
SubscribeCapacityMiBPerSec: 16,
66-
// Garbage collect messages older than 24 hours.
67-
RetentionDuration: 24 * time.Hour,
67+
RetentionDuration: 24 * time.Hour, // Garbage collect messages older than 24 hours.
6868
}
6969
_, err = admin.UpdateTopic(ctx, updateConfig)
7070
if err != nil {

0 commit comments

Comments
 (0)