Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to rejoin a group when consumer poll timeout in confluent-kafka-go v2.1.0 #980

Closed
2 tasks
ljy201758505108 opened this issue Apr 10, 2023 · 16 comments · Fixed by confluentinc/librdkafka#4256

Comments

@ljy201758505108
Copy link

ljy201758505108 commented Apr 10, 2023

Description

When I upgraded confluent-kafka-go from v2.0.2 to v2.1.0, it was not possible to rejoin consumers back to the consumer group in 2.1.0 when the time interval between two consumer poll operations exceeded max.poll.interval.ms, but it was feasible in v2.0.2.

When Group myTopicName join state changed wait-unassign-to-complete -> init:

v2.0.2: Group myTopicName join with 1 subscribed topic(s)...

v2.1.0: Requseting metadata for 1/1 topics: periodic topic and broker list refresh...

client config:

request.timeout.ms: 60000
retries: 5
socket.timeout.ms: 60000

consumer config:

session.timeout.ms: 45000
max.poll.interval.ms: 60000
heartbeat.interval.ms: 10000
socket.timeout.ms: 60000
enable.auto.commit: false
enable.auto.offset.store: false
group.id: myTopicName

How to reproduce

pseudo-code:

c,err := NewConsumer(..)
c.subscribe(topic,nil)
for {
	ev := c.poll(100)
	time.sleep(65000) // set `sleep` 65s to make `c.poll` timeout
}

Checklist

Please provide the following information:

  • [√] confluent-kafka-go and librdkafka version (v2.0.2 to v2.1.0):
  • [√] Apache Kafka broker version: 2.3.0
  • [√] Client configuration: ConfigMap{...}
  • [√] Operating system:linux x86_64
  • [√] Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@baonq-me
Copy link

baonq-me commented Apr 11, 2023

If your client takes too much time to process messages, for example, time.sleep(65000) in your pseudo-code, then just disconnect from the cluster and establish a new connection after everything is processed.

Be aware that if there is a run time error while processing messages, you must produce the message back to Kafka for retrying because the previous message is marked as consumed.

As documented at https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-interval-ms, max.poll.interval.ms must be smaller than the time in sleep() function so that the poll() is called in time.

If poll() is not called before expiration of this timeout, then the consumer is considered failed

@milindl
Copy link
Contributor

milindl commented Apr 11, 2023

By any chance, are you using "go.logs.channel.enable"?
There was a recent bug fixed around this, earlier max.poll.interval.ms was not honored while using this property.

I can't reproduce it locally given the pseudocode, it seems to work fine for me (I can rejoin the group).

@ljy201758505108
Copy link
Author

ljy201758505108 commented Apr 12, 2023

By any chance, are you using "go.logs.channel.enable"? There was a recent bug fixed around this, earlier max.poll.interval.ms was not honored while using this property.

I can't reproduce it locally given the pseudocode, it seems to work fine for me (I can rejoin the group).

Thank for your reply. I'm not using “go.logs.channel.enable”, I tried using this parameter in my test code, but the results were the same as before. None of them rejoined the group after the error, but the consumer is still polling empty messages. When I looked at the group, there were no active consumers.

@ljy201758505108
Copy link
Author

ljy201758505108 commented Apr 12, 2023

Here are my test code :

package main

import (
	"fmt"
	"os"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
	bootstrapServers := "xxxx:xx"
	group := "dGVzdDAz"
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":        bootstrapServers,
		"group.id":                 group,
		"heartbeat.interval.ms":    100,
		"session.timeout.ms":       45000,
		"max.poll.interval.ms":     60000,
		"enable.auto.commit":       false,
		"auto.offset.reset":        "earliest",
		"enable.auto.offset.store": false,

		"security.protocol":                   "sasl_ssl",
		"sasl.mechanism":                      "PLAIN",
		"sasl.username":                       "xxxx",
		"sasl.password":                       "xxxx",
		"debug":                               "all",
	})
	if err != nil {
		fmt.Printf("err: %v\n", err)
		os.Exit(1)
	}
	c.Subscribe("dGVzdDAz", nil)

	run := true

	for run {
		ev := c.Poll(100)
		fmt.Printf("Message Received:%v\n", ev)
		time.Sleep(65 * time.Second)
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}

@haoxins
Copy link

haoxins commented Apr 12, 2023

I'm facing the same issue when I upgraded from 2.0.2 to 2.1.0.

@ljy201758505108
Copy link
Author

Detailed logs:

%7|1681284240.710|RECV|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Received FetchResponse (v11, 70 bytes, CorrId 129, rtt 501.41ms)
%7|1681284240.710|FETCH|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Topic dGVzdDAz [0] MessageSet size 0, error "Success", MaxOffset 3, LSO 3, Ver 4/4
%7|1681284240.710|FETCH|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Fetch topic dGVzdDAz [0] at offset 3 (leader epoch 0, current leader epoch -1, v4)
%7|1681284240.710|FETCH|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Fetch 1/1/1 toppar(s)
%7|1681284240.710|SEND|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Sent FetchRequest (v11, 98 bytes @ 0, CorrId 130)
%7|1681284240.944|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1001: Heartbeat for group "dGVzdDAz" generation id 3
%7|1681284240.945|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Sent HeartbeatRequest (v3, 83 bytes @ 0, CorrId 132)
%7|1681284240.945|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Received HeartbeatResponse (v3, 6 bytes, CorrId 132, rtt 0.59ms)
%4|1681284240.984|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (60000ms) exceeded by 39ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%7|1681284240.984|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": updating member id "rdkafka-8240a75f-fb6b-4c37-a4eb-645d21097a84" -> ""
%7|1681284240.984|LEAVE|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": leave (in state up)
%7|1681284240.984|LEAVE|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001: Leaving group
%7|1681284240.984|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" initiating rebalance (EAGER) in state up (join-state steady) with 1 assigned partition(s) (lost): max.poll.interval.ms exceeded
%7|1681284240.984|LOST|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": current assignment of 1 partition(s) lost: max.poll.interval.ms exceeded: revoking assignment and rejoining
%7|1681284240.984|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" changed join state steady -> wait-unassign-call (state up)
%7|1681284240.984|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": delegating revoke of 1 partition(s) to application on queue rd_kafka_cgrp_new: max.poll.interval.ms exceeded
%7|1681284240.984|PAUSE|rdkafka#consumer-1| [thrd:main]: Pausing fetchers for 1 assigned partition(s): rebalance
%7|1681284240.984|PAUSE|rdkafka#consumer-1| [thrd:main]: Library pausing 1 partition(s)
%7|1681284240.984|BARRIER|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0]: rd_kafka_toppar_op_pause_resume:2270: new version barrier v5
%7|1681284240.984|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause dGVzdDAz [0] (v5)
%7|1681284240.984|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": clearing group assignment
%7|1681284240.984|OP|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0] received op PAUSE (v5) in fetch-state active (opv4)
%7|1681284240.984|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause dGVzdDAz [0]: at offset 0 (leader epoch 0) (state active, v5)
%7|1681284240.985|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Sent LeaveGroupRequest (v1, 77 bytes @ 0, CorrId 133)
%7|1681284240.986|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: Received LeaveGroupResponse (v1, 6 bytes, CorrId 133, rtt 1.06ms)
%7|1681284240.986|LEAVEGROUP|rdkafka#consumer-1| [thrd:main]: LeaveGroup response received in state up
%7|1681284241.145|FETCHDEC|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: Topic dGVzdDAz [0]: fetch decide: updating to version 5 (was 4) at offset 0 (leader epoch 0) (was offset 3 (leader epoch 0))
%7|1681284241.145|FETCH|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Topic dGVzdDAz [0] in state active at offset 0 (leader epoch 0) (6/100000 msgs, 0/65536 kb queued, opv 5) is not fetchable: paused
%7|1681284241.145|FETCHADD|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Removed dGVzdDAz [0] from fetch list (0 entries, opv 5): paused
%7|1681284241.211|RECV|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Received FetchResponse (v11, 70 bytes, CorrId 130, rtt 500.26ms)
%7|1681284241.211|DROP|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: dGVzdDAz [0]: dropping outdated fetch response (v4 < 5 or old rktp)
Message Received:Application maximum poll interval (60000ms) exceeded by 39ms
%7|1681284311.113|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" received op GET_REBALANCE_PROTOCOL in state up (join-state wait-unassign-call)
%7|1681284311.113|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" received op ASSIGN in state up (join-state wait-unassign-call)
%7|1681284311.113|CLEARASSIGN|rdkafka#consumer-1| [thrd:main]: Clearing current assignment of 1 partition(s)
%7|1681284311.113|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" changed join state wait-unassign-call -> wait-unassign-to-complete (state up)
%7|1681284311.113|LOST|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": current assignment no longer considered lost: unassign() called
%7|1681284311.113|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=1, wait_stop_cnt=0)
%7|1681284311.113|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 1 partition(s):
%7|1681284311.113|DUMP_REM|rdkafka#consumer-1| [thrd:main]:  dGVzdDAz [0] offset STORED
%7|1681284311.113|BARRIER|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0]: rd_kafka_toppar_op_fetch_stop:2210: new version barrier v6
%7|1681284311.113|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming dGVzdDAz [0] (v6)
%7|1681284311.113|BARRIER|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0]: rd_kafka_toppar_op_pause_resume:2270: new version barrier v7
%7|1681284311.113|RESUME|rdkafka#consumer-1| [thrd:main]: Resume dGVzdDAz [0] (v7)
%7|1681284311.113|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic dGVzdDAz [0]
%7|1681284311.113|REMOVE|rdkafka#consumer-1| [thrd:main]: Removing dGVzdDAz [0] from assignment (started=true, pending=false, queried=false, stored offset=INVALID)
%7|1681284311.113|REMOVE|rdkafka#consumer-1| [thrd:main]: Served 1 removed partition(s), with 0 offset(s) to commit
%7|1681284311.113|ASSIGNMENT|rdkafka#consumer-1| [thrd:main]: Current assignment of 0 partition(s) with 0 pending adds, 0 offset queries, 1 partitions awaiting stop and 0 offset commits in progress
%7|1681284311.113|OP|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0] received op FETCH_STOP (v6) in fetch-state active (opv5)
%7|1681284311.113|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for dGVzdDAz [0] in state active (v6)
%7|1681284311.113|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition dGVzdDAz [0] changed fetch state active -> stopping
%7|1681284311.113|STORETERM|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0]: offset store terminating
%7|1681284311.113|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition dGVzdDAz [0] changed fetch state stopping -> stopped
%7|1681284311.113|OP|rdkafka#consumer-1| [thrd:main]: dGVzdDAz [0] received op PAUSE (v7) in fetch-state stopped (opv6)
%7|1681284311.113|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped dGVzdDAz [0]: at offset 0 (leader epoch 0) (state stopped, v7)
%7|1681284311.113|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for dGVzdDAz [0]
%7|1681284311.113|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": delete dGVzdDAz [0]
Message Received:<nil>
%7|1681284311.113|STOPSERVE|rdkafka#consumer-1| [thrd:main]: All partitions awaiting stop are now stopped: serving assignment
%7|1681284311.113|DUMP|rdkafka#consumer-1| [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)
%7|1681284311.113|DUMP_ALL|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|DUMP_PND|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|DUMP_QRY|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|DUMP_REM|rdkafka#consumer-1| [thrd:main]: List with 0 partition(s):
%7|1681284311.113|ASSIGNDONE|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": assignment operations done in join-state wait-unassign-to-complete (rebalance rejoin=false)
%7|1681284311.113|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": unassign done in state up (join-state wait-unassign-to-complete)
%7|1681284311.113|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz": Rejoining group without an assignment: Unassignment done
%7|1681284311.113|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "dGVzdDAz" changed join state wait-unassign-to-complete -> init (state up)
Message Received:<nil>
%7|1681284480.944|METADATA|rdkafka#consumer-1| [thrd:main]: Requesting metadata for 1/1 topics: periodic topic and broker list refresh
%7|1681284480.944|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001: Request metadata for 1 topic(s): periodic topic and broker list refresh
%7|1681284480.944|SEND|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Sent MetadataRequest (v8, 38 bytes @ 0, CorrId 131)
%7|1681284480.945|RECV|rdkafka#consumer-1| [thrd:sasl_ssl://xxxxxxx:xxx/bootstrap]: sasl_ssl://xxxxxxx:xxx/1001: Received MetadataResponse (v8, 123 bytes, CorrId 131, rtt 0.69ms)
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001: ===== Received metadata (for 1 requested topics): periodic topic and broker list refresh =====
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001: ClusterId: NbUm8DJZTLKuFwYZ7uZxvQ, ControllerId: 1001
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001: 1 brokers, 1 topics
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001:   Topic dGVzdDAz with 1 partitions
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]:   Topic dGVzdDAz partition 0 Leader 1001 Epoch -1
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001:   Broker #0/1: xxxxxxx:xxx NodeId 1001
%7|1681284480.945|METADATA|rdkafka#consumer-1| [thrd:main]: sasl_ssl://xxxxxxx:xxx/1001: 1/1 requested topic(s) seen in metadata
Message Received:<nil>

@milindl
Copy link
Contributor

milindl commented Apr 13, 2023

Thanks for this report. I was able to reproduce it locally as well, I'll look into it further.

@marcosbmf
Copy link

+1 having the same issue when upgraded from 2.0.2 to 2.1.0.
On the 2.0.2 my workers would run fine, now they exit the group and don't rejoin with the following error:

%4|1681402127.003|MAXPOLL|rdkafka#consumer-1| [thrd:main]: Application maximum poll interval (20000ms) exceeded by 264ms (adjust max.poll.interval.ms for long-running message processing): leaving group

@jadamscn
Copy link

I am also getting this, despite poll() definitely being called.

Steps to reproduce:

  1. Create a new empty topic
  2. Have the consumer join a group and subscribe to that topic
  3. Run poll in a for loop - e.g. for { c.Poll(100) }
  4. Do not send any messages to the topic for the duration of max.poll.interval.ms

Despite poll being called (and logs showing the fetch), the consumer is still kicked out of the group after max.poll.interval.ms.

@fkarakas
Copy link

+1 same issue. Where is @edenhill ?

@milindl
Copy link
Contributor

milindl commented Apr 20, 2023

This is a librdkafka issue, we're fixing it here: confluentinc/librdkafka#4256

@fkarakas
Copy link

ok thanks @milindl but what I don't understand is that we should have detected this kind of problem easily with a basic test in the confluent go lib, no ?

@emasab
Copy link
Contributor

emasab commented Apr 20, 2023

@fkarakas about Magnus, check this comment.

@milindl
Copy link
Contributor

milindl commented Apr 20, 2023

And for the tests, yes, we should have ideally detected it.
We'll add a test once we fix the issue, since the current suite (and the much larger librdkafka suite) passed despite this issue.

@milindl milindl added the HIGH label Apr 20, 2023
@fkarakas
Copy link

OK.
But it is always preferable to have a falling test before the fix, so when the fix is done and the test passes you are sure that your test and the fix are correct :)

@milindl
Copy link
Contributor

milindl commented Apr 25, 2023

@fkarakas , yep, that's what we did, we added a failing test to librdkafka, where the actual issue exists. I was talking about extending the go test suite to include more tests, too.

Fix is available in the v2.1.1-RC1 and we expect to have it in v2.1.1 when it is released after some soak testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants