-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add additional integration tests for multistream and cross account #1313
Conversation
@@ -45,8 +48,9 @@ | |||
public class TestConsumer { | |||
public final KCLAppConfig consumerConfig; | |||
public final Region region; | |||
public final String streamName; | |||
public final List<String> streamNames; | |||
public final KinesisAsyncClient kinesisClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we've introduced an additional Kinesis client, should we also rename this original client to something more specific for added clarity, i.e. kinesisClientForConsumerApplication
?
@@ -67,24 +71,30 @@ public class TestConsumer { | |||
public TestConsumer(KCLAppConfig consumerConfig) throws Exception { | |||
this.consumerConfig = consumerConfig; | |||
this.region = consumerConfig.getRegion(); | |||
this.streamName = consumerConfig.getStreamName(); | |||
this.kinesisClient = consumerConfig.buildAsyncKinesisClient(); | |||
this.streamNames = consumerConfig.getStreamNames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like we are no longer reading this variable anywhere, now we just call consumerConfig.getStreamNames()
directly
if we plan on always retrieving the stream names from consumerConfig
, should we remove this variable altogether?
private void verifyCrossAccountCreds() { | ||
if (consumerConfig.getCrossAccountCredentialsProvider() == null) { | ||
throw new RuntimeException("To run cross account integration tests, pass in an AWS profile with -D" + | ||
KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think ideally the credentials providers should be abstracted away from this class
i left a comment about potentially making the getCrossAccountCredentialsProvider
method protected
and renaming it to getCredentialsProviderForStreamOwner
we can likely remove this logic in that approach if you find it suitable
for (int i = 0; i < consumerConfig.getReshardFactorList() | ||
.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit/optional: the added whitespace seems slightly unconventional here
if (consumerConfig.getRetrievalMode().equals(RetrievalMode.POLLING)) { | ||
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, null); | ||
} else if (consumerConfig.isCrossAccount()) { | ||
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, | ||
streamToConsumerArnsMap); | ||
} else { | ||
retrievalConfig = configsBuilder.retrievalConfig(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we simplify this to:
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, streamToConsumerArnsMap);
looks like the KCLAppConfig#getRetrievalConfig
method already contains most of the necessary logic, might just need to move the isCrossAccount
check to there to achieve the same behavior?
public TestRecordProcessor(StreamIdentifier streamIdentifier, RecordValidatorQueue recordValidator) { | ||
this.recordValidator = recordValidator; | ||
this.streamIdentifier = streamIdentifier; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: can replace with @RequiredArgsConstructor
log.info("Processing record pk: {}", data); | ||
recordValidator.add(shardId, data); | ||
log.info("Processing record pk for stream {}: {}", streamIdentifier.streamName(), data); | ||
String recordValidatorKey = streamIdentifier.toString() + "-" + shardId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
; can remove explicit toString()
public ShardRecordProcessor shardRecordProcessor() { | ||
return new TestRecordProcessor(this.recordValidator); | ||
return new TestRecordProcessor(null, this.recordValidator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we ever expect this method to be called now that we're implementing ShardRecordProcessorFactory#shardRecordProcessor(StreamIdentifier)
?
if not, should we just throw instead?
public List<String> getStreamNames() { | ||
if (this.streamNames == null) { | ||
return getStreamArns().stream().map(streamArn -> | ||
streamArn.toString().substring(streamArn.toString().indexOf("/") + 1)) | ||
.collect(Collectors.toList()); | ||
} else { | ||
return this.streamNames; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think we ever update the streamNames
field so looks like the second branch would be unreachable here?
public AwsCredentialsProvider getCrossAccountCredentialsProvider() { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be cleaner for this to be a protected
method called getCredentialsProviderForStreamOwner
?
in KCLAppConfig
class, this can provide the same credentials as the getCredentialsProvider
method
then the inherited KCLCrossAccountAppConfig
class can override this getCredentialsProviderForStreamOwner
method with different credentials
might be slightly more intuitive and simplify some of the other logic such as in buildAsyncKinesisClientForStreamOwner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: there's several singleton-pattern usages for setting/getting the fields in this class, wondering if we can just use final
fields instead and initialize them in a constructor
} | ||
} | ||
|
||
public abstract String getTestName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can optionally just define this method here in the parent class using some string manipulation on this.getClass().getSimpleName()
since all the test names just seem to be shortened versions of the inherited class names anyway
public String getApplicationName() { | ||
return INTEGRATION_TEST_RESOURCE_PREFIX + getTestName(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think applicationName
needs to be made unique now since we're updating the test logic to use applicationName
instead of streamName
for the lease table
catch (Exception e) { | ||
log.error("Error when getting account ID through STS for consumer", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it beneficial to be swallowing the exception and returning null
?
or would it be better to just throw an exception and fast-fail?
return config; | ||
} | ||
|
||
public Arn buildStreamArn(String streamName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: protected final
?
@@ -16,9 +14,17 @@ public class ReleaseCanaryPollingH1TestConfig extends KCLAppConfig { | |||
|
|||
private final UUID uniqueId = UUID.randomUUID(); | |||
|
|||
private final String applicationName = "PollingH1Test"; | |||
private final String streamName = "2XPollingH1TestStream_" + uniqueId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think stream names need to be unique anymore since they're no longer being used as the lease table names, but should be fine to keep this approach if you think it'd be preferable
private final int numStreams = 2; | ||
private final String applicationName = "CrossAccountMultiStreamPollingH2Test"; | ||
|
||
private final String streamName = "2XCrossAccountPollingH2TestStream"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be static
constants; here and elsewhere
ArrayList<Arn> streamArns = new ArrayList<>(numStreams); | ||
for (int i = 1; i <= numStreams; i++) { | ||
streamArns.add(buildStreamArn(String.join("_", streamName, Integer.toString(i), uniqueId.toString()))); | ||
} | ||
return streamArns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seeing this pattern for all the multi-stream test configs, can optionally use a common helper method
+ "\"Effect\": \"Allow\"," | ||
+ "\"Principal\": {\"AWS\": \"" + accountId + "\"}," | ||
+ "\"Action\": [" | ||
+ "\"kinesis:DescribeStreamConsumer\",\"kinesis:SubscribeToShard\"]," |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can optionally extract the remainder of this string into common code to reduce duplication
Will revisit these refactoring changes in favor of improving our testing suite |
Issue #, if available:
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.