-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for high level Kafka consumer mode #7504
Conversation
Newer Kafka versions are supported by using the new high level consumer API. Lucky for us, support for the new API is already included in our currently used Kafka client library. This means we can avoid updating it, and still be compatible with our on-disk journal. Instead of creating a new Transport class, use a legacy switch to keep supporting older Kafka versions. Most of this was spearheaded by Muralidhar Basani in #4770 Thank you very much! :-) Co-authored-by: Muralidhar Basani <murali.basani@gmail.com>
Fields in generated configuration forms are not placed in any specific order. (Apart from the fact that optional fields are ordered at the end) With this change it is possible to specify an explicit position attribute on a config field. The default positioning value is 100. If you want your field higher up in the form, choose a value below 100. If you want it at the end, set it higher than 100.
This allows users to configure things like TLS.
dd02642
to
dabaa00
Compare
Better leave this entirely up to the user This reverts commit dabaa00.
It might contain sensitive credentials
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
Avoid resource leaks by handling more exceptions. Shutdown the input on unrecoverable exceptions. Retry on others. Add workaround for hanging poll if kafka is down: https://issues.apache.org/jira/browse/KAFKA-4189
Also fix thread leak. We need to shutdown the ExecutorService.
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
This reacharound via the eventbus is too ugly for a problem that I can't trigger in practice. Simply log the exception and stop the transport.
If our processing throws an exception while consuming the records into our journal, it's better to stop the transport early so we don't lose too many messages. (enable.auto.commit is our default)
With the old kafka client, every next() call would mark the offset as commited. With the new client, this isn't the case anymore, thus we can simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just some remarks for the configuration field positions.
graylog2-server/src/main/java/org/graylog2/plugin/configuration/fields/ConfigurationField.java
Outdated
Show resolved
Hide resolved
graylog2-server/src/main/java/org/graylog2/inputs/transports/KafkaTransport.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good, tested and works as expected!
* Add support for newer Kafka versions Newer Kafka versions are supported by using the new high level consumer API. Lucky for us, support for the new API is already included in our currently used Kafka client library. This means we can avoid updating it, and still be compatible with our on-disk journal. Instead of creating a new Transport class, use a legacy switch to keep supporting older Kafka versions. Most of this was spearheaded by Muralidhar Basani in #4770 Thank you very much! :-) Co-authored-by: Muralidhar Basani <murali.basani@gmail.com> * Drop stale comment * Allow ordering of configuration fields Fields in generated configuration forms are not placed in any specific order. (Apart from the fact that optional fields are ordered at the end) With this change it is possible to specify an explicit position attribute on a config field. The default positioning value is 100. If you want your field higher up in the form, choose a value below 100. If you want it at the end, set it higher than 100. * Fix validation, adjust comments * Add support for custom properties This allows users to configure things like TLS. * Set system wide default for TLS protocols * Revert "Set system wide default for TLS protocols" Better leave this entirely up to the user This reverts commit dabaa00. * Avoid logging ConsumerConfig It might contain sensitive credentials * Adjust comment * Improve Exception handling and refactor code Avoid resource leaks by handling more exceptions. Shutdown the input on unrecoverable exceptions. Retry on others. Add workaround for hanging poll if kafka is down: https://issues.apache.org/jira/browse/KAFKA-4189 * Restart Input after unrecoverable Kafka error Also fix thread leak. We need to shutdown the ExecutorService. * Don't restart the Input after a KafkaException This reacharound via the eventbus is too ugly for a problem that I can't trigger in practice. Simply log the exception and stop the transport. * Stop transport if we can't process the records If our processing throws an exception while consuming the records into our journal, it's better to stop the transport early so we don't lose too many messages. (enable.auto.commit is our default) * Simplify consumeRecords() With the old kafka client, every next() call would mark the offset as commited. With the new client, this isn't the case anymore, thus we can simplify the code. * Improve position defaults Co-authored-by: Muralidhar Basani <murali.basani@gmail.com> Co-authored-by: Bernd Ahlers <bernd@graylog.com> (cherry picked from commit 37ec5db)
Good to see this finally. Thank you @dennisoelkers |
PR #7504 added support to specifiy an explicit position on configuration fields. This adds constructors to the remaning configuration fields that were lacking this option: DropdownField, ListField and NumberField.
* Supply position parameter to remaining configuration fields PR #7504 added support to specifiy an explicit position on configuration fields. This adds constructors to the remaning configuration fields that were lacking this option: DropdownField, ListField and NumberField. * Reduce some duplication Co-authored-by: Bernd Ahlers <bernd@graylog.com>
* Supply position parameter to remaining configuration fields PR #7504 added support to specifiy an explicit position on configuration fields. This adds constructors to the remaning configuration fields that were lacking this option: DropdownField, ListField and NumberField. * Reduce some duplication Co-authored-by: Bernd Ahlers <bernd@graylog.com> (cherry picked from commit 1bccb32)
* Supply position parameter to remaining configuration fields PR #7504 added support to specifiy an explicit position on configuration fields. This adds constructors to the remaning configuration fields that were lacking this option: DropdownField, ListField and NumberField. * Reduce some duplication Co-authored-by: Bernd Ahlers <bernd@graylog.com> (cherry picked from commit 1bccb32)
…7896) * Supply position parameter to remaining configuration fields PR #7504 added support to specifiy an explicit position on configuration fields. This adds constructors to the remaning configuration fields that were lacking this option: DropdownField, ListField and NumberField. * Reduce some duplication Co-authored-by: Bernd Ahlers <bernd@graylog.com> (cherry picked from commit 1bccb32)
Newer Kafka versions are supported by using the new high level consumer API.
Lucky for us, support for the new API is already included in our currently used Kafka
client library. This means we can avoid updating it, and still be
compatible with our on-disk journal.
Instead of creating a new Transport class, use a legacy switch to keep
supporting older Kafka versions.
Most of this was spearheaded by Muralidhar Basani in
#4770
Thank you very much! :-)
To configure the new Kafka client, a list of one or more Kafka brokers needs to be provided.
The legacy one needs to connect to Zookeeper.
Introduce a custom properties field, so users can tweak advanced settings or setup TLS.
Refs #5088
Refs #3960
Fixes #5819
Fixes #5778
Fixes #5001
Fixes #4481
Fixes #3730
Fixes #2780