Skip to content

Commit

Permalink
Load consumer properties from file
Browse files Browse the repository at this point in the history
This change allows to define a custom properties file that will be loaded and overwrites the default configuration of the KafkaTransport while also allowing to set other Consumer properties (like TLS configuration certs and passwords).

The order of the properties also changes to give default values the lowest precedence, then the properties from the consumer properties file are loaded to override default values, and later input configuration to override consumer properties. Consumer `client.id` is also moved to the latest stage to avoid being overriden

Empty values for the consumer properties file are ignored.

Failing to read the consumer properties file stops the KafkaTransport in the assumption that missing properties could affect the connection in unexpected ways.

Signed-off-by: Joaquin Sargiotto <joaquinsargiotto@gmail.com>
  • Loading branch information
jsargiot committed Sep 7, 2018
1 parent 5f00f3d commit 0bfa2c1
Showing 1 changed file with 39 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -79,6 +82,7 @@ public class KafkaTransport extends ThrottleableTransport {
public static final String CK_TOPIC_FILTER = "topic_filter";
public static final String CK_THREADS = "threads";
public static final String CK_OFFSET_RESET = "offset_reset";
public static final String CK_CONSUMER_PROPERTIES = "consumer_properties";

// See https://kafka.apache.org/090/documentation.html for available values for "auto.offset.reset".
private static final Map<String, String> OFFSET_RESET_VALUES = ImmutableMap.of(
Expand Down Expand Up @@ -185,19 +189,29 @@ public void run() {

final Properties props = new Properties();

// Add default values to properties so they can be overrided later
props.put("group.id", GROUP_ID);
props.put("client.id", "gl2-" + nodeId + "-" + input.getId());

props.put("fetch.min.bytes", String.valueOf(configuration.getInt(CK_FETCH_MIN_BYTES)));
props.put("fetch.wait.max.ms", String.valueOf(configuration.getInt(CK_FETCH_WAIT_MAX)));
props.put("zookeeper.connect", configuration.getString(CK_ZOOKEEPER));
props.put("auto.offset.reset", configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET));
// Default auto commit interval is 60 seconds. Reduce to 1 second to minimize message duplication
// if something breaks.
props.put("auto.commit.interval.ms", "1000");
// Set a consumer timeout to avoid blocking on the consumer iterator.
props.put("consumer.timeout.ms", "1000");

try {
// Overwrite defaults with consumer properties config file (if exists)
props.putAll(getConsumerPropertiesFromFile(configuration, CK_CONSUMER_PROPERTIES));
} catch (IOException e) {
doStop();
throw new MisfireException("Unable to read consumer properties file.", e);
}

// Overwrite everything with Input configuration
props.put("client.id", "gl2-" + nodeId + "-" + input.getId());
props.put("fetch.min.bytes", String.valueOf(configuration.getInt(CK_FETCH_MIN_BYTES)));
props.put("fetch.wait.max.ms", String.valueOf(configuration.getInt(CK_FETCH_WAIT_MAX)));
props.put("zookeeper.connect", configuration.getString(CK_ZOOKEEPER));
props.put("auto.offset.reset", configuration.getString(CK_OFFSET_RESET, DEFAULT_OFFSET_RESET));

final int numThreads = configuration.getInt(CK_THREADS);
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
cc = Consumer.createJavaConsumerConnector(consumerConfig);
Expand Down Expand Up @@ -281,6 +295,18 @@ public void run() {
}, 1, 1, TimeUnit.SECONDS);
}

private Properties getConsumerPropertiesFromFile(Configuration configuration, String configKey) throws IOException {
Properties props = new Properties();
String consumerPropertiesPath = configuration.getString(configKey);

if (consumerPropertiesPath != null && consumerPropertiesPath != "") {
File propertiesFile = new File(consumerPropertiesPath);
props.load(new FileInputStream(propertiesFile));
}

return props;
}

private ExecutorService executorService(int numThreads) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-transport-%d").build();
return new InstrumentedExecutorService(
Expand Down Expand Up @@ -382,6 +408,13 @@ public ConfigurationRequest getRequestedConfiguration() {
"What to do when there is no initial offset in ZooKeeper or if an offset is out of range",
ConfigurationField.Optional.OPTIONAL));

cr.addField(new TextField(
CK_CONSUMER_PROPERTIES,
"Consumer properties file",
"",
"Path to consumer properties file.",
ConfigurationField.Optional.OPTIONAL));

return cr;
}
}
Expand Down

0 comments on commit 0bfa2c1

Please sign in to comment.