-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: kafka header extraction expression in composite zilla.yaml #1362
Conversation
if (matcher.reset(path).matches()) | ||
{ | ||
this.extractHeaders.add(new KafkaTopicHeaderType(name, | ||
String.format(INTERNAL_VALUE, matcher.group(2)))); | ||
String.format(INTERNAL_VALUE, matcher.group(2)), externalPath)); | ||
} | ||
else if (internalMatcher.reset(path).matches()) | ||
{ | ||
this.extractHeaders.add(new KafkaTopicHeaderType(name, path)); | ||
this.extractHeaders.add(new KafkaTopicHeaderType(name, path, externalPath)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we build the KafkaTopicTransformsConfig
there should be no matcher used, just capture the parsed values directly and nothing else.
In constructor of KafkaTopicType
, where we prepare the KafkaTopicConfig
for use in the binding for each core, that's where the parsed config transforms should be converted as needed to something like KafkaTopicTransformType
using matcher etc, instead of using parsed transforms config directly.
private static final String PATH = "^\\$\\{message\\.(key|value)\\.([A-Za-z_][A-Za-z0-9_]*)\\}$"; | ||
private static final Pattern PATH_PATTERN = Pattern.compile(PATH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matcher is behavioral and therefore not in the right place for config parsing logic.
This class should reflect exactly what is parsed, already protected for valid values by schema as needed.
else if (internalMatcher.reset(path).matches()) | ||
{ | ||
this.extractHeaders.add(new KafkaTopicHeaderType(name, path, externalPath)); | ||
this.extractHeaders.add(new KafkaTopicHeaderType(name, path)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaTopicHeaderType
should be called KafkaTopicHeaderConfig
, since it represents parsed config.
List<KafkaTopicHeaderType> transformHeaders = Optional.ofNullable(transforms.extractHeaders) | ||
.orElse(emptyList()) | ||
.stream() | ||
.filter(header -> matcher.reset(header.path).matches()) | ||
.map(header -> new KafkaTopicHeaderType(header.name, | ||
String.format(TRANSFORM_INTERNAL_PATH, matcher.group(2)))) | ||
.toList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good, but KafkaTopicHeaderType
should be in internal config (leaving KafkaTopicHeaderConfig
in public config
package for use by builder (also in public config package).
Fixes #1138