Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for high level Kafka consumer mode #7504

Merged
merged 19 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
Expand All @@ -36,16 +37,21 @@
import kafka.consumer.Whitelist;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.BooleanField;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
import org.graylog2.plugin.configuration.fields.DropdownField;
import org.graylog2.plugin.configuration.fields.NumberField;
import org.graylog2.plugin.configuration.fields.TextField;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
Expand All @@ -58,8 +64,11 @@
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -68,22 +77,27 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.IntStream;

import static com.codahale.metrics.MetricRegistry.name;

public class KafkaTransport extends ThrottleableTransport {
public static final String CK_LEGACY = "legacy_mode";
public static final String CK_FETCH_MIN_BYTES = "fetch_min_bytes";
public static final String CK_FETCH_WAIT_MAX = "fetch_wait_max";
public static final String CK_ZOOKEEPER = "zookeeper";
public static final String CK_BOOTSTRAP = "bootstrap_server";
public static final String CK_TOPIC_FILTER = "topic_filter";
public static final String CK_THREADS = "threads";
public static final String CK_OFFSET_RESET = "offset_reset";
public static final String CK_GROUP_ID = "group_id";
public static final String CK_CUSTOM_PROPERTIES = "custom_properties";

// See https://kafka.apache.org/090/documentation.html for available values for "auto.offset.reset".
private static final ImmutableMap<String, String> OFFSET_RESET_VALUES = ImmutableMap.of(
"largest", "Automatically reset the offset to the largest offset",
"smallest", "Automatically reset the offset to the smallest offset"
"largest", "Automatically reset the offset to the latest offset", // "largest" OR "latest"
"smallest", "Automatically reset the offset to the earliest offset" // "smallest" OR "earliest"
);

private static final String DEFAULT_OFFSET_RESET = "largest";
Expand Down Expand Up @@ -173,17 +187,124 @@ public void setMessageAggregator(CodecAggregator ignored) {
}

@Override
public void doLaunch(final MessageInput input) throws MisfireException {
serverStatus.awaitRunning(new Runnable() {
@Override
public void run() {
lifecycleStateChange(Lifecycle.RUNNING);
public void doLaunch(final MessageInput input) {
final boolean legacyMode = configuration.getBoolean(CK_LEGACY, true);
if (legacyMode) {
final String zooKeper = configuration.getString(CK_ZOOKEEPER);
if (Strings.isNullOrEmpty(zooKeper)) {
throw new IllegalArgumentException("ZooKeeper configuration setting cannot be empty");
}
});
} else {
final String bootStrap = configuration.getString(CK_BOOTSTRAP);
if (Strings.isNullOrEmpty(bootStrap)) {
throw new IllegalArgumentException("Bootstrap server configuration setting cannot be empty");
}
}

serverStatus.awaitRunning(() -> lifecycleStateChange(Lifecycle.RUNNING));
// listen for lifecycle changes
serverEventBus.register(this);

if (legacyMode) {
doLaunchLegacy(input);
} else {
doLaunchConsumer(input);
}
scheduler.scheduleAtFixedRate(() -> lastSecBytesRead.set(lastSecBytesReadTmp.getAndSet(0)), 1, 1, TimeUnit.SECONDS);
}

private void doLaunchConsumer(final MessageInput input) {
final Properties props = new Properties();

props.put("group.id", configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
props.put("fetch.min.bytes", String.valueOf(configuration.getInt(CK_FETCH_MIN_BYTES)));
props.put("fetch.max.wait.ms", String.valueOf(configuration.getInt(CK_FETCH_WAIT_MAX)));
//noinspection ConstantConditions
props.put("bootstrap.servers", configuration.getString(CK_BOOTSTRAP));
// Map largest -> latest, smallest -> earliest
final String resetValue = configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET);
props.put("auto.offset.reset", resetValue.equals("largest") ? "latest" : "earliest");
// Default auto commit interval is 60 seconds. Reduce to 1 second to minimize message duplication
// if something breaks.
props.put("auto.commit.interval.ms", "1000");
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

insertCustomProperties(props);

final int numThreads = configuration.getInt(CK_THREADS);
final ExecutorService executor = executorService(numThreads);
// this is being used during shutdown to first stop all submitted jobs before committing the offsets back to zookeeper
// and then shutting down the connection.
// this is to avoid yanking away the connection from the consumer runnables
stopLatch = new CountDownLatch(numThreads);

IntStream.range(0, numThreads).forEach(i -> {
executor.submit(() -> {
final Properties nprops = (Properties) props.clone();
nprops.put("client.id", "gl2-" + nodeId + "-" + input.getId() + "-" + i);
final KafkaConsumer<byte[], byte[]> consumer;
try {
consumer = new KafkaConsumer<>(nprops);
//noinspection ConstantConditions
consumer.subscribe(Pattern.compile(configuration.getString(CK_TOPIC_FILTER)), new NoOpConsumerRebalanceListener());
} catch (Exception e) {
LOG.warn("Could not create KafkaConsumer", e);
throw e;
}

while (!stopped) {
final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
final Iterator<ConsumerRecord<byte[], byte[]>> consumerIterator = consumerRecords.iterator();
try {
// we have to use hasNext() here instead foreach, because next() marks the message as processed immediately
// noinspection WhileLoopReplaceableByForEach
while (consumerIterator.hasNext()) {
if (paused) {
// we try not to spin here, so we wait until the lifecycle goes back to running.
LOG.debug("Message processing is paused, blocking until message processing is turned back on.");
Uninterruptibles.awaitUninterruptibly(pausedLatch);
}
// check for being stopped before actually getting the message, otherwise we could end up losing that message
if (stopped) {
break;
}
if (isThrottled()) {
blockUntilUnthrottled();
}

// process the message, this will immediately mark the message as having been processed. this gets tricky
// if we get an exception about processing it down below.
// final MessageAndMetadata<byte[], byte[]> message = consumerIterator.next();

final byte[] bytes = consumerIterator.next().value();

// it is possible that the message is null
if (bytes == null) {
continue;
}

totalBytesRead.addAndGet(bytes.length);
lastSecBytesReadTmp.addAndGet(bytes.length);

final RawMessage rawMessage = new RawMessage(bytes);

input.processRawMessage(rawMessage);
}
} catch (Exception e) {
LOG.error("Kafka error in consumer thread.", e);
}
}
// explicitly commit our offsets when stopping.
// this might trigger a couple of times, but it won't hurt
consumer.commitAsync();
stopLatch.countDown();
consumer.close();
});
});
}

private void doLaunchLegacy(final MessageInput input) {
final Properties props = new Properties();

props.put("group.id", configuration.getString(CK_GROUP_ID, DEFAULT_GROUP_ID));
Expand All @@ -199,6 +320,8 @@ public void run() {
// Set a consumer timeout to avoid blocking on the consumer iterator.
props.put("consumer.timeout.ms", "1000");

insertCustomProperties(props);

final int numThreads = configuration.getInt(CK_THREADS);
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
cc = Consumer.createJavaConsumerConnector(consumerConfig);
Expand Down Expand Up @@ -257,7 +380,6 @@ public void run() {

final RawMessage rawMessage = new RawMessage(bytes);

// TODO implement throttling
input.processRawMessage(rawMessage);
}
} catch (ConsumerTimeoutException e) {
Expand All @@ -274,12 +396,16 @@ public void run() {
}
});
}
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
lastSecBytesRead.set(lastSecBytesReadTmp.getAndSet(0));
}
}, 1, 1, TimeUnit.SECONDS);
}

private void insertCustomProperties(Properties props) {
try {
final Properties customProperties = new Properties();
customProperties.load(new ByteArrayInputStream(configuration.getString(CK_CUSTOM_PROPERTIES, "").getBytes(StandardCharsets.UTF_8)));
props.putAll(customProperties);
} catch (IOException e) {
LOG.error("Failed to read custom properties", e);
}
}

private ExecutorService executorService(int numThreads) {
Expand Down Expand Up @@ -340,13 +466,27 @@ public static class Config extends ThrottleableTransport.Config {
public ConfigurationRequest getRequestedConfiguration() {
final ConfigurationRequest cr = super.getRequestedConfiguration();

cr.addField(new BooleanField(CK_LEGACY,
"Legacy mode",
true,
"Use old ZooKeeper-based consumer API. (Used before Graylog 3.3)",
10
));
cr.addField(new TextField(
CK_BOOTSTRAP,
"Bootstrap Servers",
"127.0.0.1:9092",
"Comma separated list of one or more Kafka brokers. (Format: \"host1:port1,host2:port2\")." +
"Not used in legacy mode.",
ConfigurationField.Optional.OPTIONAL,
11));
cr.addField(new TextField(
CK_ZOOKEEPER,
"ZooKeeper address",
"ZooKeeper address (legacy mode only)",
"127.0.0.1:2181",
"Host and port of the ZooKeeper that is managing your Kafka cluster.",
ConfigurationField.Optional.NOT_OPTIONAL));

"Host and port of the ZooKeeper that is managing your Kafka cluster. Not used in consumer API (non-legacy) mode.",
ConfigurationField.Optional.OPTIONAL,
12));
cr.addField(new TextField(
CK_TOPIC_FILTER,
"Topic filter regex",
Expand Down Expand Up @@ -380,7 +520,7 @@ public ConfigurationRequest getRequestedConfiguration() {
"Auto offset reset",
DEFAULT_OFFSET_RESET,
OFFSET_RESET_VALUES,
"What to do when there is no initial offset in ZooKeeper or if an offset is out of range",
"What to do when there is no initial offset in Kafka or if an offset is out of range",
ConfigurationField.Optional.OPTIONAL));

cr.addField(new TextField(
Expand All @@ -389,8 +529,18 @@ public ConfigurationRequest getRequestedConfiguration() {
DEFAULT_GROUP_ID,
"Name of the consumer group the Kafka input belongs to",
ConfigurationField.Optional.OPTIONAL));
cr.addField(new TextField(
CK_CUSTOM_PROPERTIES,
"Custom Kafka properties",
"",
"A newline separated list of Kafka properties. (e.g.: \"ssl.keystore.location=/etc/graylog/server/kafka.keystore.jks\").",
ConfigurationField.Optional.OPTIONAL,
110,
TextField.Attribute.TEXTAREA
));

return cr;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public Map<String, Map<String, Object>> asList() {
config.put("is_optional", f.isOptional().equals(ConfigurationField.Optional.OPTIONAL));
config.put("attributes", f.getAttributes());
config.put("additional_info", f.getAdditionalInformation());
config.put("position", f.getPosition());

configs.put(f.getName(), config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,20 @@ public abstract class AbstractConfigurationField implements ConfigurationField {
protected final String humanName;
protected final String description;
protected final ConfigurationField.Optional optional;
protected int position;
final int DEFAULT_POSITION = 100;

public AbstractConfigurationField(String field_type, String name, String humanName, String description, ConfigurationField.Optional optional1) {
this.field_type = field_type;
this.name = name;
this.humanName = humanName;
this.description = description;
this.optional = optional1;
this.position = DEFAULT_POSITION;
}
public AbstractConfigurationField(String field_type, String name, String humanName, String description, ConfigurationField.Optional optional1, int position) {
this(field_type, name, humanName,description,optional1);
this.position = position;
}

@Override
Expand Down Expand Up @@ -69,4 +76,9 @@ public List<String> getAttributes() {
public Map<String, Map<String, String>> getAdditionalInformation() {
return Collections.emptyMap();
}

@Override
public int getPosition() {
return position;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public BooleanField(String name, String humanName, boolean defaultValue, String
super(FIELD_TYPE, name, humanName, description, Optional.OPTIONAL);
this.defaultValue = defaultValue;
}
public BooleanField(String name, String humanName, boolean defaultValue, String description, int position) {
super(FIELD_TYPE, name, humanName, description, Optional.OPTIONAL, position);
this.defaultValue = defaultValue;
}

@Override
public Object getDefaultValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ enum Optional {
List<String> getAttributes();

Map<String, Map<String, String>> getAdditionalInformation();

default int getPosition() {
return 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public TextField(String name, String humanName, String defaultValue, String desc
}
}
}
public TextField(String name, String humanName, String defaultValue, String description, Optional isOptional, int position, Attribute... attrs) {
super(FIELD_TYPE, name, humanName, description, isOptional, position);
this.defaultValue = defaultValue;
this.attributes = Lists.newArrayList();
if (attrs != null) {
for (Attribute attribute : attrs) {
this.attributes.add(attribute.toString().toLowerCase(Locale.ENGLISH));
}
}
}

@Override
public Object getDefaultValue() {
Expand Down
1 change: 1 addition & 0 deletions graylog2-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<!-- Silence Kafka log chatter -->
<Logger name="kafka.log.Log" level="warn"/>
<Logger name="kafka.log.OffsetIndex" level="warn"/>
<Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="warn"/>
<!-- Silence useless session validation messages -->
<Logger name="org.apache.shiro.session.mgt.AbstractValidatingSessionManager" level="warn"/>
<Root level="warn">
Expand Down
Loading