-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka upgrade - compatible with old and new versions #4770
Conversation
Removed logging of message
…aylog2-server into kafka_compatible
@joschi: This PR now includes the suggestions you requested in order to use Kafka 1.1.0. Old kafka inputs (9.0.1) will work without any needed migration or changes. A new input transport was created for Kafka 1.1.0 and new inputs where created for Kafka Raw, Syslog, and Gelf. |
@muralibasani If you want the PR to be reviewed we kindly ask you to sign the CLA. |
Just noticed that I did not sign. Did it now. Thanks. |
Can someone look at this pr? Not sure if I am missing something else. |
@muralibasani Thank you, I will take a look shortly. |
@muralibasani Not sure if I am missing something, but to me it looks like the |
@pbr0ck3r @muralibasani Heh, okay. I think there was a misunderstanding. 😄 We can leave the 0.9.x library in the pom to continue to support the old Kafka versions (and the journal) and include the new Kafka 1.x consumer API as well. Since they use different Java packages, we should be able to have them both. The new Kafka transport can then use the new consumer library and thus support new Kafka versions. |
Just to make sure we are on the same page here @bernd. You want us to include both dependencies for the old comsumer API (9.0.x) and the new consumer API (1.x). We then integrate/use the new libraries in the KafkaNewTransport.
|
@pbr0ck3r Yep, exactly! 👍 |
Guys, that is not going to work. Changing pom, to include only new kafka version does not work for old brokers. They don't use entirely different packages. kafka.consumer package is used in both old and new versions. I have ofcourse tested this and knew very well. You can try changing the same and check it too. |
Adding to my previous comment, aim of the PR is to not break existing brokers, and also giving the flexibility to work with new brokers with new transport/ui. It is only possible with this solution. Please see Joschi comments on Apr 20 in old PR #4729 |
I didn't know about that. I thought the new Java API ( That means we have to stay on the 0.9.0.1 version for I have some questions to avoid further confusion:
|
@bernd org.apache.kafka.clients.consumer.KafkaConsumer v0.9.0.1 works with 0.9.0.1 brokers and also with 1.x brokers? |
@bernd Know that this is marked as ready for review. This PR includes supports kafka for 1.0.x. But now that 1.1.x and also 2.x are out. What are GL thoughts on upgrading to 1.1.x or even 2.x? Just am trying to get a feel of how GL is going to support kafka in the future. |
@pbr0ck3r Thanks for the heads up! Yeah, we definitely want to support newer versions of Kafka in the future. We didn't have enough resources to do it yet, unfortunately. We also need to figure out an upgrade path for users that currently use older Kafka setups with Graylog. |
@bernd Cool! I might start looking into kafka 1.1.x and the amount of work it would take to upgrade this PR to 1.1.x. As well as looking into 2.x and how much work it would be to upgrade to that. Also am currently running this PR code in our production instance with no known issues. |
Just a note that this PR works with kafka 1.1.1 (which is the latest 1.1.x) release. |
@bernd Kafka 2.1 works as well with this PR. |
For the love of god would someone approve this? |
Hello, does this PR have any chance to be reviewed then merged ? :) Thanks ! |
It will be useful to have a official build with this integration instead of build our custom version. Thanks guys, |
Hi!
Would splitting the Kafka inputs out into plugins (and thus making it easier to replace/update them) help you at all? |
Hello @kroepke, |
Hello @muralibasani @kroepke, I made this repo with your work as a plugin (it's dirty probably but working :D). |
Hi, is there any timeline for supporting this feature? Maybe only for a limited range of Kafka versions, >= 1.1.x would be fine :-) We are waiting for this PR because of missing SSL/TLS support and otherwise need alternatives. We would like to stick with Graylog, so maybe there is also the chance to officially provide the proposed graylog-kafka-plugin? Thanks for a heads-up! |
@hgnoyke Thanks for the heads-up. We will check if we can include it in the 3.2 release. |
The other day this really beautiful girl started talking to me and I could tell she was really interested in me but then I remembered that whenever something really great is about to happen to a person if you can look into the future you'll probably find only pain and agony so despite almost thinking "you know what - this girl is so hot and intelligent maybe I'll be one of the lucky few" and running with it I quickly pulled out my Android and brought up this GitHub issue and quickly found my center and got jerked back into reality. Thank you! |
|
||
topics.stream() | ||
.forEach(topicName->{ | ||
if(topicName!=null && topicName.matches(configuration.getString(CK_TOPIC_FILTER))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicating functionality in KafkaClient, which already supports pattern matching with the KafkaClient.subsribe(Pattern) API call, and iterating through all topics like this is potentially really inefficient (and doesn't handle updates to available topics.
It'd be better to avoid requiring Zookeeper as a parameter at all and just let the KafkaClient library handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sent in a pull request to address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a look
Newer Kafka versions are supported by using the new high level consumer API. Lucky for us, support for the new API is already included in our currently used Kafka client library. This means we can avoid updating it, and still be compatible with our on-disk journal. Instead of creating a new Transport class, use a legacy switch to keep supporting older Kafka versions. Most of this was spearheaded by Muralidhar Basani in #4770 Thank you very much! :-) Co-authored-by: Muralidhar Basani <murali.basani@gmail.com>
* Add support for newer Kafka versions Newer Kafka versions are supported by using the new high level consumer API. Lucky for us, support for the new API is already included in our currently used Kafka client library. This means we can avoid updating it, and still be compatible with our on-disk journal. Instead of creating a new Transport class, use a legacy switch to keep supporting older Kafka versions. Most of this was spearheaded by Muralidhar Basani in #4770 Thank you very much! :-) Co-authored-by: Muralidhar Basani <murali.basani@gmail.com> * Drop stale comment * Allow ordering of configuration fields Fields in generated configuration forms are not placed in any specific order. (Apart from the fact that optional fields are ordered at the end) With this change it is possible to specify an explicit position attribute on a config field. The default positioning value is 100. If you want your field higher up in the form, choose a value below 100. If you want it at the end, set it higher than 100. * Fix validation, adjust comments * Add support for custom properties This allows users to configure things like TLS. * Set system wide default for TLS protocols * Revert "Set system wide default for TLS protocols" Better leave this entirely up to the user This reverts commit dabaa00. * Avoid logging ConsumerConfig It might contain sensitive credentials * Adjust comment * Improve Exception handling and refactor code Avoid resource leaks by handling more exceptions. Shutdown the input on unrecoverable exceptions. Retry on others. Add workaround for hanging poll if kafka is down: https://issues.apache.org/jira/browse/KAFKA-4189 * Restart Input after unrecoverable Kafka error Also fix thread leak. We need to shutdown the ExecutorService. * Don't restart the Input after a KafkaException This reacharound via the eventbus is too ugly for a problem that I can't trigger in practice. Simply log the exception and stop the transport. * Stop transport if we can't process the records If our processing throws an exception while consuming the records into our journal, it's better to stop the transport early so we don't lose too many messages. (enable.auto.commit is our default) * Simplify consumeRecords() With the old kafka client, every next() call would mark the offset as commited. With the new client, this isn't the case anymore, thus we can simplify the code. * Improve position defaults Co-authored-by: Muralidhar Basani <murali.basani@gmail.com> Co-authored-by: Bernd Ahlers <bernd@graylog.com>
* Add support for newer Kafka versions Newer Kafka versions are supported by using the new high level consumer API. Lucky for us, support for the new API is already included in our currently used Kafka client library. This means we can avoid updating it, and still be compatible with our on-disk journal. Instead of creating a new Transport class, use a legacy switch to keep supporting older Kafka versions. Most of this was spearheaded by Muralidhar Basani in #4770 Thank you very much! :-) Co-authored-by: Muralidhar Basani <murali.basani@gmail.com> * Drop stale comment * Allow ordering of configuration fields Fields in generated configuration forms are not placed in any specific order. (Apart from the fact that optional fields are ordered at the end) With this change it is possible to specify an explicit position attribute on a config field. The default positioning value is 100. If you want your field higher up in the form, choose a value below 100. If you want it at the end, set it higher than 100. * Fix validation, adjust comments * Add support for custom properties This allows users to configure things like TLS. * Set system wide default for TLS protocols * Revert "Set system wide default for TLS protocols" Better leave this entirely up to the user This reverts commit dabaa00. * Avoid logging ConsumerConfig It might contain sensitive credentials * Adjust comment * Improve Exception handling and refactor code Avoid resource leaks by handling more exceptions. Shutdown the input on unrecoverable exceptions. Retry on others. Add workaround for hanging poll if kafka is down: https://issues.apache.org/jira/browse/KAFKA-4189 * Restart Input after unrecoverable Kafka error Also fix thread leak. We need to shutdown the ExecutorService. * Don't restart the Input after a KafkaException This reacharound via the eventbus is too ugly for a problem that I can't trigger in practice. Simply log the exception and stop the transport. * Stop transport if we can't process the records If our processing throws an exception while consuming the records into our journal, it's better to stop the transport early so we don't lose too many messages. (enable.auto.commit is our default) * Simplify consumeRecords() With the old kafka client, every next() call would mark the offset as commited. With the new client, this isn't the case anymore, thus we can simplify the code. * Improve position defaults Co-authored-by: Muralidhar Basani <murali.basani@gmail.com> Co-authored-by: Bernd Ahlers <bernd@graylog.com> (cherry picked from commit 37ec5db)
Thank you @muralibasani and all the others for your help and for your patience. I'd appreciate tests and feedback. |
Currently Graylog2 is limited to run with Kafka 0.9.0.1 or lower versions. This change overcomes it, by being comptable with old 0.9.0.1 to 1.1.0 versions.
Description
Keep org.apache.kafka:kafka_2.11:0.9.01, so that the message journal stays compatible with previous versions of Graylog. (No changes to pom and kafka versions.)
Created a new Kafka transport instead of changing the existing KafkaTransport class and use the new transport to create a new Kafka input. The old input still works with older Kafka brokers.
This way users can decide which version of Kafka they want to support and existing setups won't break.
Motivation and Context
Change is required to make Graylog2 run with newer kafka versions like 1.1.0 and also does not break with old kafka version 0.9.0.1
How Has This Been Tested?
Tested with different versions of kafka brokers and tested.
Screenshots (if appropriate):
Types of changes
Checklist: