Skip to content

Commit 6147d44

Browse files
authored
Aggregation option in Kinesis Writer Python sdk (#34323)
* 👍 Add aggregation feature * 🐛 Fix import bug * ✅ Add test * 👍 Add 3 argument * 💡 Add comment * 🎨 Format code
1 parent b5358c3 commit 6147d44

File tree

3 files changed

+75
-1
lines changed

3 files changed

+75
-1
lines changed

sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ private abstract static class CrossLanguageConfiguration {
6464
String region;
6565
@Nullable String serviceEndpoint;
6666
boolean verifyCertificate;
67+
boolean aggregationEnabled;
68+
Integer aggregationMaxBytes;
69+
Duration aggregationMaxBufferedTime;
70+
Duration aggregationShardRefreshInterval;
6771

6872
public void setStreamName(String streamName) {
6973
this.streamName = streamName;
@@ -88,6 +92,23 @@ public void setServiceEndpoint(@Nullable String serviceEndpoint) {
8892
public void setVerifyCertificate(@Nullable Boolean verifyCertificate) {
8993
this.verifyCertificate = verifyCertificate == null || verifyCertificate;
9094
}
95+
96+
public void setAggregationEnabled(@Nullable Boolean aggregationEnabled) {
97+
this.aggregationEnabled = aggregationEnabled != null && aggregationEnabled;
98+
}
99+
100+
public void setAggregationMaxBytes(Long aggregationMaxBytes) {
101+
this.aggregationMaxBytes = aggregationMaxBytes.intValue();
102+
}
103+
104+
public void setAggregationMaxBufferedTime(Long aggregationMaxBufferedTime) {
105+
this.aggregationMaxBufferedTime = Duration.millis(aggregationMaxBufferedTime);
106+
}
107+
108+
public void setAggregationShardRefreshInterval(Long aggregationShardRefreshInterval) {
109+
this.aggregationShardRefreshInterval =
110+
Duration.standardMinutes(aggregationShardRefreshInterval);
111+
}
91112
}
92113

93114
public static class WriteBuilder
@@ -131,9 +152,22 @@ public PTransform<PCollection<byte[]>, KinesisIO.Write.Result> buildExternal(
131152
.skipCertificateVerification(!configuration.verifyCertificate)
132153
.build())
133154
.withPartitioner(p -> pk)
134-
.withRecordAggregationDisabled()
135155
.withSerializer(serializer);
136156

157+
if (configuration.aggregationEnabled) {
158+
writeTransform =
159+
writeTransform.withRecordAggregation(
160+
KinesisIO.RecordAggregation.builder()
161+
.maxBytes(configuration.aggregationMaxBytes)
162+
.maxBufferedTimeJitter(0.7) // 70% jitter
163+
.maxBufferedTime(configuration.aggregationMaxBufferedTime)
164+
.shardRefreshIntervalJitter(0.5) // 50% jitter
165+
.shardRefreshInterval(configuration.aggregationShardRefreshInterval)
166+
.build());
167+
} else {
168+
writeTransform = writeTransform.withRecordAggregationDisabled();
169+
}
170+
137171
return writeTransform;
138172
}
139173
}

sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py

+20
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def test_kinesis_write(self):
9595
# TODO: remove this test once
9696
# https://github.com/apache/beam/issues/20416 is resolved
9797
self.run_kinesis_write()
98+
self.run_kinesis_write_with_aggregation()
9899
records = self.kinesis_helper.read_from_stream(self.aws_kinesis_stream)
99100
self.assertEqual(
100101
sorted(records),
@@ -118,6 +119,25 @@ def run_kinesis_write(self):
118119
verify_certificate=(not self.use_localstack),
119120
partition_key='1'))
120121

122+
def run_kinesis_write_with_aggregation(self):
123+
with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
124+
p.not_use_test_runner_api = True
125+
_ = (
126+
p
127+
| 'Impulse' >> beam.Impulse()
128+
| 'Generate' >> beam.FlatMap(lambda x: range(NUM_RECORDS)) # pylint: disable=bad-option-value
129+
| 'Map to bytes' >>
130+
beam.Map(lambda x: RECORD + str(x).encode()).with_output_types(bytes)
131+
| 'WriteToKinesis' >> WriteToKinesis(
132+
stream_name=self.aws_kinesis_stream,
133+
aws_access_key=self.aws_access_key,
134+
aws_secret_key=self.aws_secret_key,
135+
region=self.aws_region,
136+
service_endpoint=self.aws_service_endpoint,
137+
verify_certificate=(not self.use_localstack),
138+
partition_key='1',
139+
aggregation_enabled=True))
140+
121141
def run_kinesis_read(self):
122142
records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)]
123143

sdks/python/apache_beam/io/kinesis.py

+20
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ def default_io_expansion_service():
112112
('partition_key', str),
113113
('service_endpoint', Optional[str]),
114114
('verify_certificate', Optional[bool]),
115+
('aggregation_enabled', Optional[bool]),
116+
('aggregation_max_bytes', Optional[int]),
117+
('aggregation_max_buffered_time', Optional[int]),
118+
('aggregation_shard_refresh_interval', Optional[int]),
115119
],
116120
)
117121

@@ -135,6 +139,10 @@ def __init__(
135139
verify_certificate=None,
136140
producer_properties=None,
137141
expansion_service=None,
142+
aggregation_enabled=None,
143+
aggregation_max_bytes=51200,
144+
aggregation_max_buffered_time=100,
145+
aggregation_shard_refresh_interval=2,
138146
):
139147
"""
140148
Initializes a write operation to Kinesis.
@@ -151,6 +159,13 @@ def __init__(
151159
since the AWS IOs upgraded to v2. Trying to set it will lead to an
152160
error. For more info, see https://github.com/apache/beam/issues/33430.
153161
:param expansion_service: The address (host:port) of the ExpansionService.
162+
:param aggregation_enabled: Enable or disable aggregation.
163+
:param aggregation_max_bytes: Maximum number of bytes to buffer before
164+
sending a batch of records. Defaults to 51200.
165+
:param aggregation_max_buffered_time: Maximum time(millisecond) to buffer
166+
records before sending a batch of records. Defaults to 100.
167+
:param aggregation_shard_refresh_interval: Interval in minutes to refresh
168+
the shard map. Defaults to 2.
154169
"""
155170
if producer_properties is not None:
156171
raise ValueError(
@@ -167,6 +182,11 @@ def __init__(
167182
partition_key=partition_key,
168183
service_endpoint=service_endpoint,
169184
verify_certificate=verify_certificate,
185+
aggregation_enabled=aggregation_enabled,
186+
aggregation_max_bytes=aggregation_max_bytes,
187+
aggregation_max_buffered_time=aggregation_max_buffered_time,
188+
aggregation_shard_refresh_interval=
189+
aggregation_shard_refresh_interval,
170190
)),
171191
expansion_service or default_io_expansion_service(),
172192
)

0 commit comments

Comments
 (0)