Skip to content

Commit 1a8a3ed

Browse files
committed
fix(consumer): support JoinGroup V4
Had previously commented this out until I'd looked into the behaviour described in the protocol doc, but it looks like we should be safe to support this. Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
1 parent 40b52c5 commit 1a8a3ed

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

consumer_group.go

+11-10
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,10 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
318318
}
319319
return c.retryNewSession(ctx, topics, handler, retries, true)
320320
case ErrMemberIdRequired:
321-
// from JoinGroupRequest v4, if client start with empty member id,
322-
// it need to get member id from response and send another join request to join group
321+
// from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts
322+
// with an empty member id, it needs to get the assigned id from the
323+
// response and send another join request with that id to actually join the
324+
// group
323325
c.memberID = join.MemberId
324326
return c.retryNewSession(ctx, topics, handler, retries+1 /*keep retry time*/, false)
325327
case ErrFencedInstancedId:
@@ -442,14 +444,13 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
442444
if c.config.Version.IsAtLeast(V2_0_0_0) {
443445
req.Version = 3
444446
}
445-
// XXX: protocol states "Starting from version 4, the client needs to issue a
446-
// second request to join group", so not enabling this until we can
447-
// investigate
448-
/*
449-
if c.config.Version.IsAtLeast(V2_2_0_0) {
450-
req.Version = 4
451-
}
452-
*/
447+
// from JoinGroupRequest v4 onwards (due to KIP-394) the client will actually
448+
// send two JoinGroupRequests, once with the empty member id, and then again
449+
// with the assigned id from the first response. This is handled via the
450+
// ErrMemberIdRequired case.
451+
if c.config.Version.IsAtLeast(V2_2_0_0) {
452+
req.Version = 4
453+
}
453454
if c.config.Version.IsAtLeast(V2_3_0_0) {
454455
req.Version = 5
455456
req.GroupInstanceId = c.groupInstanceId

0 commit comments

Comments
 (0)