Skip to content

Commit ce981d1

Browse files
committed
admin: add DeleteConsumerGroup
1 parent c50148e commit ce981d1

File tree

3 files changed

+84
-1
lines changed

3 files changed

+84
-1
lines changed

admin.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ type ClusterAdmin interface {
8484
// List the consumer group offsets available in the cluster.
8585
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
8686

87-
// Get information about the nodes in the cluster.
87+
// Delete a consumer group.
88+
DeleteConsumerGroup(group string) error
89+
90+
// Get information about the nodes in the cluster
8891
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
8992

9093
// Close shuts down the admin and closes underlying client.
@@ -614,3 +617,30 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
614617

615618
return coordinator.FetchOffset(request)
616619
}
620+
621+
func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
622+
coordinator, err := ca.client.Coordinator(group)
623+
if err != nil {
624+
return err
625+
}
626+
627+
request := &DeleteGroupsRequest{
628+
Groups: []string{group},
629+
}
630+
631+
resp, err := coordinator.DeleteGroups(request)
632+
if err != nil {
633+
return err
634+
}
635+
636+
groupErr, ok := resp.GroupErrorCodes[group]
637+
if !ok {
638+
return ErrIncompleteResponse
639+
}
640+
641+
if groupErr != ErrNoError {
642+
return groupErr
643+
}
644+
645+
return nil
646+
}

admin_test.go

+30
Original file line numberDiff line numberDiff line change
@@ -859,3 +859,33 @@ func TestListConsumerGroupOffsets(t *testing.T) {
859859
}
860860

861861
}
862+
863+
func TestDeleteConsumerGroup(t *testing.T) {
864+
seedBroker := NewMockBroker(t, 1)
865+
defer seedBroker.Close()
866+
867+
group := "my-group"
868+
869+
seedBroker.SetHandlerByMap(map[string]MockResponse{
870+
// "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
871+
"DeleteGroupsRequest": NewMockDeleteGroupsRequest(t).SetDeletedGroups([]string{group}),
872+
"MetadataRequest": NewMockMetadataResponse(t).
873+
SetController(seedBroker.BrokerID()).
874+
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
875+
"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
876+
})
877+
878+
config := NewConfig()
879+
config.Version = V1_1_0_0
880+
881+
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
882+
if err != nil {
883+
t.Fatal(err)
884+
}
885+
886+
err = admin.DeleteConsumerGroup(group)
887+
if err != nil {
888+
t.Fatalf("DeleteConsumerGroup failed with error %v", err)
889+
}
890+
891+
}

mockresponses.go

+23
Original file line numberDiff line numberDiff line change
@@ -885,3 +885,26 @@ func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
885885
}
886886
return res
887887
}
888+
889+
type MockDeleteGroupsResponse struct {
890+
deletedGroups []string
891+
}
892+
893+
func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
894+
return &MockDeleteGroupsResponse{}
895+
}
896+
897+
func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
898+
m.deletedGroups = groups
899+
return m
900+
}
901+
902+
func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
903+
resp := &DeleteGroupsResponse{
904+
GroupErrorCodes: map[string]KError{},
905+
}
906+
for _, group := range m.deletedGroups {
907+
resp.GroupErrorCodes[group] = ErrNoError
908+
}
909+
return resp
910+
}

0 commit comments

Comments
 (0)