Skip to content

Commit 00389ae

Browse files
committed
'Version 1.2.0 of the DynamoDB Streams Kinesis Adapter'
1 parent 188e38b commit 00389ae

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1260
-1097
lines changed

META-INF/MANIFEST.MF

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ Manifest-Version: 1.0
22
Bundle-ManifestVersion: 2
33
Bundle-Name: Amazon DynamoDB Streams Kinesis Adapter for Java
44
Bundle-SymbolicName: com.amazonaws.dynamodb.streams.kinesis.adapter;singleton:=true
5-
Bundle-Version: 1.1.1
5+
Bundle-Version: 1.2.0
66
Bundle-Vendor: Amazon Technologies, Inc
77
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
88
Export-Package: com.amazonaws.services.dynamodbv2.streamsadapter,

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
* The KCL is designed to process streams from Amazon Kinesis, but by adding the DynamoDB Streams Kinesis Adapter, your application can process DynamoDB Streams instead, seamlessly and efficiently.
1515

1616
## Release Notes
17-
* Requires Kinesis Client Library version >= 1.6.4.
18-
* Requires AWS Java SDK version >= 1.11.14
17+
* Requires Kinesis Client Library version >= 1.7.5.
18+
* Requires AWS Java SDK version >= 1.11.115
1919
* It is highly recommended to [configure][kcl-configuration] Kinesis Client Library with `MaxRecords = 1000` and `IdleTimeInMillis = 500` to optimize DynamoDB Streams costs.
2020

2121
## Getting Started

pom.xml

+7-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
77
<packaging>jar</packaging>
88
<name>DynamoDB Streams Adapter for Java</name>
9-
<version>1.1.1</version>
9+
<version>1.2.0</version>
1010
<description>The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream.</description>
1111
<url>https://aws.amazon.com/dynamodb</url>
1212

@@ -23,8 +23,8 @@
2323
</licenses>
2424

2525
<properties>
26-
<aws-java-sdk.version>[1.11.14, 2.0.0)</aws-java-sdk.version>
27-
<amazon-kinesis-client.version>[1.6.4, 1.7.0)</amazon-kinesis-client.version>
26+
<aws-java-sdk.version>[1.11.115, 2.0.0)</aws-java-sdk.version>
27+
<amazon-kinesis-client.version>[1.7.5, 1.8.0)</amazon-kinesis-client.version>
2828
<gpg.skip>true</gpg.skip>
2929
</properties>
3030

@@ -112,6 +112,10 @@
112112
<name>Lei Ye</name>
113113
<email>leiye@amazon.com</email>
114114
</developer>
115+
<developer>
116+
<name>Alexander Patrikalakis</name>
117+
<email>amcp@amazon.co.jp</email>
118+
</developer>
115119
</developers>
116120

117121
<build>

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AdapterRequestCache.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,8 @@ public AdapterRequestCache(int capacity) {
4747
/**
4848
* Adds an entry to the cache.
4949
*
50-
* @param request
51-
* Kinesis request
52-
* @param requestAdapter
53-
* DynamoDB adapter client wrapper for the Kinesis request
50+
* @param request Kinesis request
51+
* @param requestAdapter DynamoDB adapter client wrapper for the Kinesis request
5452
*/
5553
public synchronized void addEntry(AmazonWebServiceRequest request, AmazonWebServiceRequest requestAdapter) {
5654
if (null == request || null == requestAdapter) {
@@ -67,8 +65,7 @@ public synchronized void addEntry(AmazonWebServiceRequest request, AmazonWebServ
6765
/**
6866
* Gets the actual DynamoDB Streams request made for a Kinesis request.
6967
*
70-
* @param request
71-
* Kinesis request
68+
* @param request Kinesis request
7269
* @return actual DynamoDB Streams request made for the associated Kinesis request
7370
*/
7471
public synchronized AmazonWebServiceRequest getEntry(AmazonWebServiceRequest request) {

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/AmazonDynamoDBStreamsAdapterClient.java

+83-134
Large diffs are not rendered by default.

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsMultiLangDaemon.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* Main app that launches the worker that runs the multi-language record processor.
3131
* Requires a properties file containing configuration for this daemon and the KCL.
32-
*
32+
* <p>
3333
* This version extends the KCL's MultiLangDaemon to use DynamoDB Streams instead of
3434
* Kinesis.
3535
*/
@@ -39,7 +39,7 @@ public class StreamsMultiLangDaemon {
3939

4040
/**
4141
* @param args Accepts a single argument, that argument is a properties file which provides KCL configuration as
42-
* well as the name of an executable.
42+
* well as the name of an executable.
4343
*/
4444
public static void main(String[] args) {
4545

@@ -55,17 +55,13 @@ public static void main(String[] args) {
5555
System.exit(1);
5656
}
5757

58-
ExecutorService executorService = config.getExecutorService();
59-
60-
Worker worker = new StreamsWorker(
61-
config.getRecordProcessorFactory(),
62-
config.getKinesisClientLibConfiguration(),
63-
executorService);
58+
final ExecutorService executorService = config.getExecutorService();
59+
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(config.getRecordProcessorFactory(), config.getKinesisClientLibConfiguration(), executorService);
6460

6561
// Daemon
66-
MultiLangDaemon daemon = new MultiLangDaemon(worker);
62+
final MultiLangDaemon daemon = new MultiLangDaemon(worker);
6763

68-
Future<Integer> future = executorService.submit(daemon);
64+
final Future<Integer> future = executorService.submit(daemon);
6965
try {
7066
System.exit(future.get());
7167
} catch (InterruptedException | ExecutionException e) {

src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsRecordProcessor.java

+22-15
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717
import java.util.ArrayList;
1818
import java.util.List;
1919

20+
import org.apache.commons.logging.Log;
21+
import org.apache.commons.logging.LogFactory;
22+
2023
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
21-
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
2224
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
23-
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
25+
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
26+
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
27+
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
28+
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
2429
import com.amazonaws.services.kinesis.model.Record;
2530

2631
/**
@@ -30,17 +35,21 @@
3035
*/
3136
public abstract class StreamsRecordProcessor implements IRecordProcessor {
3237

38+
private static final Log LOG = LogFactory.getLog(StreamsRecordProcessor.class);
39+
3340
/**
3441
* {@inheritDoc}
3542
*/
36-
public abstract void initialize(String shardId);
43+
public abstract void initialize(InitializationInput initializationInput);
3744

38-
public void processRecords(List<Record> records,
39-
IRecordProcessorCheckpointer checkpointer) {
40-
List<com.amazonaws.services.dynamodbv2.model.Record> streamsRecords =
41-
new ArrayList<com.amazonaws.services.dynamodbv2.model.Record>();
42-
for(Record record : records) {
43-
if(record instanceof RecordAdapter) {
45+
public void processRecords(ProcessRecordsInput processRecordsInput) {
46+
final List<com.amazonaws.services.dynamodbv2.model.Record> streamsRecords = new ArrayList<com.amazonaws.services.dynamodbv2.model.Record>();
47+
if (processRecordsInput.getRecords() == null) {
48+
LOG.warn("ProcessRecordsInput's list of Records was null. Skipping.");
49+
return;
50+
}
51+
for (Record record : processRecordsInput.getRecords()) {
52+
if (record instanceof RecordAdapter) {
4453
streamsRecords.add(((RecordAdapter) record).getInternalObject());
4554
} else {
4655
// This record processor is not being used with the
@@ -49,7 +58,7 @@ public void processRecords(List<Record> records,
4958
throw new IllegalArgumentException("Record is not an instance of RecordAdapter");
5059
}
5160
}
52-
processStreamsRecords(streamsRecords, checkpointer);
61+
processStreamsRecords(streamsRecords, processRecordsInput.getCheckpointer());
5362
}
5463

5564
/**
@@ -58,16 +67,14 @@ public void processRecords(List<Record> records,
5867
* Upon fail over, the new instance will get records with sequence number &gt; checkpoint position
5968
* for each partition key.
6069
*
61-
* @param records Data records to be processed
70+
* @param records Data records to be processed
6271
* @param checkpointer RecordProcessor should use this instance to checkpoint their progress.
6372
*/
64-
public abstract void processStreamsRecords(List<com.amazonaws.services.dynamodbv2.model.Record> records,
65-
IRecordProcessorCheckpointer checkpointer);
73+
public abstract void processStreamsRecords(List<com.amazonaws.services.dynamodbv2.model.Record> records, IRecordProcessorCheckpointer checkpointer);
6674

6775
/**
6876
* {@inheritDoc}
6977
*/
70-
public abstract void shutdown(IRecordProcessorCheckpointer checkpointer,
71-
ShutdownReason reason);
78+
public abstract void shutdown(ShutdownInput shutdownInput);
7279

7380
}

0 commit comments

Comments
 (0)