Skip to content

Commit e9bd1b8

Browse files
committed
fix(proto): handle V3 member metadata and empty owned partitions
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
1 parent 96c37d1 commit e9bd1b8

File tree

2 files changed

+69
-8
lines changed

2 files changed

+69
-8
lines changed

consumer_group_members.go

+52-8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ type ConsumerGroupMemberMetadata struct {
99
Topics []string
1010
UserData []byte
1111
OwnedPartitions []*OwnedPartition
12+
GenerationID int32
13+
RackID *string
1214
}
1315

1416
func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
@@ -22,6 +24,27 @@ func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
2224
return err
2325
}
2426

27+
if m.Version >= 1 {
28+
if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil {
29+
return err
30+
}
31+
for _, op := range m.OwnedPartitions {
32+
if err := op.encode(pe); err != nil {
33+
return err
34+
}
35+
}
36+
}
37+
38+
if m.Version >= 2 {
39+
pe.putInt32(m.GenerationID)
40+
}
41+
42+
if m.Version >= 3 {
43+
if err := pe.putNullableString(m.RackID); err != nil {
44+
return err
45+
}
46+
}
47+
2548
return nil
2649
}
2750

@@ -48,18 +71,29 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
4871
}
4972
return err
5073
}
51-
if n == 0 {
52-
return nil
53-
}
54-
m.OwnedPartitions = make([]*OwnedPartition, n)
55-
for i := 0; i < n; i++ {
56-
m.OwnedPartitions[i] = &OwnedPartition{}
57-
if err := m.OwnedPartitions[i].decode(pd); err != nil {
58-
return err
74+
if n > 0 {
75+
m.OwnedPartitions = make([]*OwnedPartition, n)
76+
for i := 0; i < n; i++ {
77+
m.OwnedPartitions[i] = &OwnedPartition{}
78+
if err := m.OwnedPartitions[i].decode(pd); err != nil {
79+
return err
80+
}
5981
}
6082
}
6183
}
6284

85+
if m.Version >= 2 {
86+
if m.GenerationID, err = pd.getInt32(); err != nil {
87+
return err
88+
}
89+
}
90+
91+
if m.Version >= 3 {
92+
if m.RackID, err = pd.getNullableString(); err != nil {
93+
return err
94+
}
95+
}
96+
6397
return nil
6498
}
6599

@@ -68,6 +102,16 @@ type OwnedPartition struct {
68102
Partitions []int32
69103
}
70104

105+
func (m *OwnedPartition) encode(pe packetEncoder) error {
106+
if err := pe.putString(m.Topic); err != nil {
107+
return err
108+
}
109+
if err := pe.putInt32Array(m.Partitions); err != nil {
110+
return err
111+
}
112+
return nil
113+
}
114+
71115
func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
72116
if m.Topic, err = pd.getString(); err != nil {
73117
return err

consumer_group_members_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ var (
4242
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
4343
0, 0, 0, 0, // OwnedPartitions KIP-429
4444
}
45+
46+
groupMemberMetadataV3NilOwned = []byte{
47+
0, 3, // Version
48+
0, 0, 0, 1, // Topic array length
49+
0, 3, 'o', 'n', 'e', // Topic one
50+
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
51+
0, 0, 0, 0, // OwnedPartitions KIP-429
52+
0, 0, 0, 64, // GenerationID
53+
0, 4, 'r', 'a', 'c', 'k', // RackID
54+
}
4555
)
4656

4757
func TestConsumerGroupMemberMetadata(t *testing.T) {
@@ -77,6 +87,13 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
7787
}
7888
}
7989

90+
func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) {
91+
meta := new(ConsumerGroupMemberMetadata)
92+
if err := decode(groupMemberMetadataV3NilOwned, meta, nil); err != nil {
93+
t.Error("Failed to decode V3 data", err)
94+
}
95+
}
96+
8097
func TestConsumerGroupMemberAssignment(t *testing.T) {
8198
amt := &ConsumerGroupMemberAssignment{
8299
Version: 0,

0 commit comments

Comments
 (0)