-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add additional integration tests for multistream and cross account #1313
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.commons.lang3.RandomStringUtils; | ||
import software.amazon.awssdk.core.SdkBytes; | ||
import software.amazon.awssdk.arns.Arn; | ||
import software.amazon.awssdk.regions.Region; | ||
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; | ||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; | ||
|
@@ -18,6 +19,7 @@ | |
import software.amazon.kinesis.common.ConfigsBuilder; | ||
import software.amazon.kinesis.common.InitialPositionInStreamExtended; | ||
import software.amazon.kinesis.config.KCLAppConfig; | ||
import software.amazon.kinesis.config.RetrievalMode; | ||
import software.amazon.kinesis.coordinator.CoordinatorConfig; | ||
import software.amazon.kinesis.coordinator.Scheduler; | ||
import software.amazon.kinesis.leases.LeaseManagementConfig; | ||
|
@@ -33,6 +35,7 @@ | |
import java.math.BigInteger; | ||
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
|
@@ -45,8 +48,9 @@ | |
public class TestConsumer { | ||
public final KCLAppConfig consumerConfig; | ||
public final Region region; | ||
public final String streamName; | ||
public final List<String> streamNames; | ||
public final KinesisAsyncClient kinesisClient; | ||
public final KinesisAsyncClient kinesisClientForStreamOwner; | ||
private MetricsConfig metricsConfig; | ||
private RetrievalConfig retrievalConfig; | ||
private CheckpointConfig checkpointConfig; | ||
|
@@ -67,24 +71,30 @@ public class TestConsumer { | |
public TestConsumer(KCLAppConfig consumerConfig) throws Exception { | ||
this.consumerConfig = consumerConfig; | ||
this.region = consumerConfig.getRegion(); | ||
this.streamName = consumerConfig.getStreamName(); | ||
this.kinesisClient = consumerConfig.buildAsyncKinesisClient(); | ||
this.streamNames = consumerConfig.getStreamNames(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like we are no longer reading this variable anywhere, now we just call |
||
this.kinesisClientForStreamOwner = consumerConfig.buildAsyncKinesisClientForStreamOwner(); | ||
this.kinesisClient = consumerConfig.buildAsyncKinesisClientForConsumer(); | ||
this.dynamoClient = consumerConfig.buildAsyncDynamoDbClient(); | ||
} | ||
|
||
public void run() throws Exception { | ||
|
||
if (consumerConfig.isCrossAccount()) { | ||
verifyCrossAccountCreds(); | ||
} | ||
|
||
final StreamExistenceManager streamExistenceManager = new StreamExistenceManager(this.consumerConfig); | ||
final LeaseTableManager leaseTableManager = new LeaseTableManager(this.dynamoClient); | ||
|
||
// Clean up any old streams or lease tables left in test environment | ||
cleanTestResources(streamExistenceManager, leaseTableManager); | ||
|
||
// Check if stream is created. If not, create it | ||
streamExistenceManager.checkStreamAndCreateIfNecessary(this.streamName); | ||
streamExistenceManager.checkStreamsAndCreateIfNecessary(); | ||
Map<Arn, Arn> streamToConsumerArnsMap = streamExistenceManager.createCrossAccountConsumerIfNecessary(); | ||
|
||
startProducer(); | ||
setUpConsumerResources(); | ||
setUpConsumerResources(streamToConsumerArnsMap); | ||
|
||
try { | ||
startConsumer(); | ||
|
@@ -116,6 +126,13 @@ public void run() throws Exception { | |
} | ||
} | ||
|
||
private void verifyCrossAccountCreds() { | ||
if (consumerConfig.getCrossAccountCredentialsProvider() == null) { | ||
throw new RuntimeException("To run cross account integration tests, pass in an AWS profile with -D" + | ||
KCLAppConfig.CROSS_ACCOUNT_PROFILE_PROPERTY); | ||
} | ||
} | ||
Comment on lines
+129
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think ideally the credentials providers should be abstracted away from this class |
||
|
||
private void cleanTestResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { | ||
log.info("----------Before starting, Cleaning test environment----------"); | ||
log.info("----------Deleting all lease tables in account----------"); | ||
|
@@ -135,25 +152,35 @@ private void startProducer() { | |
if (consumerConfig.getReshardFactorList() != null) { | ||
log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList()); | ||
|
||
final StreamScaler s = new StreamScaler( | ||
kinesisClient, | ||
consumerConfig.getStreamName(), | ||
consumerConfig.getReshardFactorList(), | ||
consumerConfig | ||
); | ||
for (String streamName : consumerConfig.getStreamNames()) { | ||
final StreamScaler streamScaler = new StreamScaler(kinesisClientForStreamOwner, streamName, | ||
consumerConfig.getReshardFactorList(), consumerConfig); | ||
|
||
// Schedule the stream scales 4 minutes apart with 2 minute starting delay | ||
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) { | ||
producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); | ||
// Schedule the stream scales 4 minutes apart with 2 minute starting delay | ||
for (int i = 0; i < consumerConfig.getReshardFactorList() | ||
.size(); i++) { | ||
Comment on lines
+160
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit/optional: the added whitespace seems slightly unconventional here |
||
producerExecutor.schedule(streamScaler, (4 * i) + 2, TimeUnit.MINUTES); | ||
} | ||
} | ||
} | ||
} | ||
|
||
private void setUpConsumerResources() throws Exception { | ||
private void setUpConsumerResources(Map<Arn, Arn> streamToConsumerArnsMap) throws Exception { | ||
// Setup configuration of KCL (including DynamoDB and CloudWatch) | ||
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(); | ||
final ConfigsBuilder configsBuilder = consumerConfig.getConfigsBuilder(streamToConsumerArnsMap); | ||
|
||
// For polling mode in both CAA and non CAA, set retrievalSpecificConfig to use PollingConfig | ||
// For SingleStreamMode EFO CAA, must set the retrieval config to specify the consumerArn in FanoutConfig | ||
// For MultiStream EFO CAA, the consumerArn can be set in StreamConfig | ||
if (consumerConfig.getRetrievalMode().equals(RetrievalMode.POLLING)) { | ||
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, null); | ||
} else if (consumerConfig.isCrossAccount()) { | ||
retrievalConfig = consumerConfig.getRetrievalConfig(configsBuilder, | ||
streamToConsumerArnsMap); | ||
} else { | ||
retrievalConfig = configsBuilder.retrievalConfig(); | ||
} | ||
Comment on lines
+175
to
+182
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we simplify this to:
looks like the |
||
|
||
retrievalConfig = consumerConfig.getRetrievalConfig(); | ||
checkpointConfig = configsBuilder.checkpointConfig(); | ||
coordinatorConfig = configsBuilder.coordinatorConfig(); | ||
leaseManagementConfig = configsBuilder.leaseManagementConfig() | ||
|
@@ -194,23 +221,27 @@ private void stopProducer() { | |
} | ||
|
||
public void publishRecord() { | ||
final PutRecordRequest request; | ||
try { | ||
request = PutRecordRequest.builder() | ||
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) | ||
.streamName(this.streamName) | ||
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 is 1 KB | ||
.build(); | ||
kinesisClient.putRecord(request).get(); | ||
|
||
// Increment the payload counter if the putRecord call was successful | ||
payloadCounter = payloadCounter.add(new BigInteger("1")); | ||
successfulPutRecords += 1; | ||
log.info("---------Record published, successfulPutRecords is now: {}", successfulPutRecords); | ||
} catch (InterruptedException e) { | ||
log.info("Interrupted, assuming shutdown. ", e); | ||
} catch (ExecutionException | RuntimeException e) { | ||
log.error("Error during publish records", e); | ||
for (String streamName : consumerConfig.getStreamNames()) { | ||
try { | ||
final PutRecordRequest request = PutRecordRequest.builder() | ||
.partitionKey(RandomStringUtils.randomAlphabetic(5, 20)) | ||
.streamName(streamName) | ||
.data(SdkBytes.fromByteBuffer(wrapWithCounter(5, payloadCounter))) // 1024 | ||
// is 1 KB | ||
.build(); | ||
Comment on lines
+227
to
+231
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: alignment seems slightly unconventional, can maybe just use single-tab indentation here? |
||
kinesisClientForStreamOwner.putRecord(request) | ||
.get(); | ||
|
||
// Increment the payload counter if the putRecord call was successful | ||
payloadCounter = payloadCounter.add(new BigInteger("1")); | ||
successfulPutRecords += 1; | ||
log.info("---------Record published for stream {}, successfulPutRecords is now: {}", | ||
streamName, successfulPutRecords); | ||
} catch (InterruptedException e) { | ||
log.info("Interrupted, assuming shutdown. ", e); | ||
} catch (ExecutionException | RuntimeException e) { | ||
log.error("Error during publish records", e); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -248,10 +279,13 @@ private void validateRecordProcessor() throws Exception { | |
} | ||
|
||
private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { | ||
log.info("-------------Start deleting stream.---------"); | ||
streamExistenceManager.deleteResource(this.streamName); | ||
log.info("-------------Start deleting streams.---------"); | ||
for (String streamName : consumerConfig.getStreamNames()) { | ||
log.info("Deleting stream {}", streamName); | ||
streamExistenceManager.deleteResource(streamName); | ||
} | ||
log.info("---------Start deleting lease table.---------"); | ||
leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); | ||
leaseTableManager.deleteResource(consumerConfig.getApplicationName()); | ||
log.info("---------Finished deleting resources.---------"); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.slf4j.MDC; | ||
import software.amazon.kinesis.common.StreamIdentifier; | ||
import software.amazon.kinesis.exceptions.InvalidStateException; | ||
import software.amazon.kinesis.exceptions.ShutdownException; | ||
import software.amazon.kinesis.lifecycle.events.LeaseLostInput; | ||
|
@@ -23,12 +24,15 @@ public class TestRecordProcessor implements ShardRecordProcessor { | |
|
||
private static final String SHARD_ID_MDC_KEY = "ShardId"; | ||
|
||
private StreamIdentifier streamIdentifier; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
|
||
private String shardId; | ||
|
||
private final RecordValidatorQueue recordValidator; | ||
|
||
public TestRecordProcessor(RecordValidatorQueue recordValidator) { | ||
public TestRecordProcessor(StreamIdentifier streamIdentifier, RecordValidatorQueue recordValidator) { | ||
this.recordValidator = recordValidator; | ||
this.streamIdentifier = streamIdentifier; | ||
} | ||
Comment on lines
+33
to
36
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional: can replace with |
||
|
||
@Override | ||
|
@@ -51,8 +55,9 @@ public void processRecords(ProcessRecordsInput processRecordsInput) { | |
|
||
for (KinesisClientRecord kinesisRecord : processRecordsInput.records()) { | ||
final String data = new String(asByteArray(kinesisRecord.data())); | ||
log.info("Processing record pk: {}", data); | ||
recordValidator.add(shardId, data); | ||
log.info("Processing record pk for stream {}: {}", streamIdentifier.streamName(), data); | ||
String recordValidatorKey = streamIdentifier.toString() + "-" + shardId; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
recordValidator.add(recordValidatorKey, data); | ||
} | ||
|
||
} catch (Throwable t) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
package software.amazon.kinesis.application; | ||
|
||
import software.amazon.kinesis.common.StreamIdentifier; | ||
import software.amazon.kinesis.processor.ShardRecordProcessor; | ||
import software.amazon.kinesis.processor.ShardRecordProcessorFactory; | ||
import software.amazon.kinesis.utils.RecordValidatorQueue; | ||
|
@@ -14,7 +15,12 @@ public TestRecordProcessorFactory(RecordValidatorQueue recordValidator) { | |
|
||
@Override | ||
public ShardRecordProcessor shardRecordProcessor() { | ||
return new TestRecordProcessor(this.recordValidator); | ||
return new TestRecordProcessor(null, this.recordValidator); | ||
Comment on lines
17
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we ever expect this method to be called now that we're implementing |
||
} | ||
|
||
@Override | ||
public ShardRecordProcessor shardRecordProcessor(StreamIdentifier streamIdentifier) { | ||
return new TestRecordProcessor(streamIdentifier, this.recordValidator); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we've introduced an additional Kinesis client, should we also rename this original client to something more specific for added clarity, i.e.
kinesisClientForConsumerApplication
?