Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumer leaves group after 5 mins #110

Closed
blindspotbounty opened this issue Aug 9, 2023 · 2 comments
Closed

Kafka consumer leaves group after 5 mins #110

blindspotbounty opened this issue Aug 9, 2023 · 2 comments

Comments

@blindspotbounty
Copy link
Collaborator

KafkaConsumer stops after 5 mins of consumption with error:

MAXPOLL] [thrd:main]: Application maximum poll interval (300000ms) exceeded by 472ms (adjust max.poll.interval.ms for long-running message processing): leaving group

There are two problems:

  1. The error itself
  2. This error is not reported to client code (i.e. not handled in eventPoll)

How to reproduce

As minimal reproducing snippet I can suggest the following diff to existing test to make sequence big enough

diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift
index d18cd72..f0ceb94 100644
--- a/Sources/Kafka/RDKafka/RDKafkaClient.swift
+++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift
@@ -183,6 +183,10 @@ final class RDKafkaClient: Sendable {
                 self.handleLogEvent(event)
             case .offsetCommit:
                 self.handleOffsetCommitEvent(event)
+            case .error:
+                let err = rd_kafka_event_error_string(event)
+                let error = String(cString: err!)
+                fatalError("Got an error for: \(error)")
             case .none:
                 // Finished reading events, return early
                 return events
diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift
index b5c551e..a5edd5d 100644
--- a/Tests/IntegrationTests/KafkaTests.swift
+++ b/Tests/IntegrationTests/KafkaTests.swift
@@ -291,7 +291,7 @@ final class KafkaTests: XCTestCase {
     }
 
     func testCommittedOffsetsAreCorrect() async throws {
-        let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
+        let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10000000) // some endless sequence of messages
         let firstConsumerOffset = testMessages.count / 2
         let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
 
@@ -307,6 +307,9 @@ final class KafkaTests: XCTestCase {
             ),
             bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
         )
+//      Uncomment to speed up (30 seconds instead of 5 mins):
+//        consumer1Config.maximumPollInterval = .seconds(30)
+//        consumer1Config.session.timeout = .seconds(30)
         consumer1Config.autoOffsetReset = .beginning // Read topic from beginning
         consumer1Config.broker.addressFamily = .v4
 
@@ -455,6 +458,7 @@ final class KafkaTests: XCTestCase {
         var messageIDs = Set<KafkaProducerMessageID>()
 
         for message in messages {
+            try await Task.sleep(for: .seconds(1)) // avoid queue overflow
             messageIDs.insert(try producer.send(message))
         }

Result of this test is:

Test Case '-[IntegrationTests.KafkaTests testCommittedOffsetsAreCorrect]' started.
2023-08-09T15:07:04+0300 warning kafka.test : [MAXPOLL] [thrd:main]: Application maximum poll interval (300000ms) exceeded by 472ms (adjust max.poll.interval.ms for long-running message processing): leaving group
Kafka/RDKafkaClient.swift:189: Fatal error: Got an error for: Application maximum poll interval (300000ms) exceeded by 472ms

That is very similar to confluentinc/confluent-kafka-go#980 but not the same. That bug was fixed in 2.1.1 which is a part of swift kafka gsoc.

felixschlegel added a commit to felixschlegel/swift-kafka-client that referenced this issue Aug 9, 2023
Motivation:

This PR fixes issue swift-server#110.

`KakfaConsumer`: by polling the `rd_kafka_queue_get_main` queue instead of the `rd_kafka_queue_get_consumer` queue,
the timer for `max.poll.interval.ms` did not get reset which eventually
resulted in a timeout despite polling. (See [`librdkafka`
documentation](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#acacdb55ae7cb6abfbde89621e512b078))

Modifications:

* `RDKafkaClient`:
    * rename `mainQueue` to `queue`
    * use `rd_kafka_queue_get_consumer` instead of
      `rd_kafka_queue_get_main` for `KakfaConsumer` clients
      -> this will reset the timer for `max.poll.interval.ms` so that
         the consumer does not time out despite polling
    * invoke `rd_kafka_queue_destroy(self.queue)` on
      `RDKafkaClient.deinit` to loose reference to queue
@felixschlegel
Copy link
Contributor

Hello @blindspotbounty ,

thank you very much for raising this issue and providing the steps to reproduce, this really helped debug this issue and eventually find the fix! (#113)

The problem here is that we are using rd_kafka_queue_get_main instead of rd_kafka_queue_get_consumer which meant that the max.poll.interval.ms timer did not get reset (See Documentation).

Thanks a lot,

Felix

FranzBusch pushed a commit that referenced this issue Aug 10, 2023
* Fix: Kafka consumer leaves group after 5 mins

Motivation:

This PR fixes issue #110.

`KakfaConsumer`: by polling the `rd_kafka_queue_get_main` queue instead of the `rd_kafka_queue_get_consumer` queue,
the timer for `max.poll.interval.ms` did not get reset which eventually
resulted in a timeout despite polling. (See [`librdkafka`
documentation](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#acacdb55ae7cb6abfbde89621e512b078))

Modifications:

* `RDKafkaClient`:
    * rename `mainQueue` to `queue`
    * use `rd_kafka_queue_get_consumer` instead of
      `rd_kafka_queue_get_main` for `KakfaConsumer` clients
      -> this will reset the timer for `max.poll.interval.ms` so that
         the consumer does not time out despite polling
    * invoke `rd_kafka_queue_destroy(self.queue)` on
      `RDKafkaClient.deinit` to loose reference to queue

* Review blindspot

Modifications:

* update comment at invocation of `RDKafkaClient.pollSetConsumer`
* don't fail softly when `rd_kafka_queue_get_consumer` returns `nil`
* don't create new reference to consumer queue in
  `RDKafkaClient.consumerClose()`
@blindspotbounty
Copy link
Collaborator Author

Now works as expected!
@felixschlegel thank you for the fix, closing the issue!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants