Skip to content

Commit 7919c3f

Browse files
Test Avro extension against multiple Avro versions (#25216)
1 parent 0dcb26d commit 7919c3f

File tree

14 files changed

+451
-38
lines changed

14 files changed

+451
-38
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import CommonJobProperties as commonJobProperties
20+
import PostcommitJobBuilder
21+
22+
23+
// This job runs the Java tests that depends on Avro against different Avro API versions
24+
PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_Avro_Versions', 'Run PostCommit_Java_Avro_Versions',
25+
'Java Avro Versions Post Commit Tests', this) {
26+
27+
description('Java Avro Versions Post Commit Tests')
28+
29+
// Set common parameters.
30+
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 240)
31+
32+
// Publish all test results to Jenkins
33+
publishers {
34+
archiveJunit('**/build/test-results/**/*.xml')
35+
}
36+
37+
// Gradle goals for this job.
38+
steps {
39+
gradle {
40+
rootBuildScriptDir(commonJobProperties.checkoutDir)
41+
tasks(":javaAvroVersionsTest")
42+
commonJobProperties.setGradleSwitches(delegate)
43+
// Specify maven home on Jenkins, needed by Maven archetype integration tests.
44+
switches('-Pmaven_home=/home/jenkins/tools/maven/apache-maven-3.5.4')
45+
}
46+
}
47+
}

.test-infra/jenkins/job_PreCommit_Java.groovy

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import PrecommitJobBuilder
2020

2121
// exclude paths with their own PreCommit tasks
2222
def excludePaths = [
23+
'extensions/avro',
2324
'extensions/sql',
2425
'io/amazon-web-services',
2526
'io/amazon-web-services2',

build.gradle.kts

+4
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,10 @@ tasks.register("javaHadoopVersionsTest") {
393393
dependsOn(":runners:spark:3:hadoopVersionsTest")
394394
}
395395

396+
tasks.register("javaAvroVersionsTest") {
397+
dependsOn(":sdks:java:extensions:avro:avroVersionsTest")
398+
}
399+
396400
tasks.register("sqlPostCommit") {
397401
dependsOn(":sdks:java:extensions:sql:postCommit")
398402
dependsOn(":sdks:java:extensions:sql:jdbc:postCommit")

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java

+5
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,11 @@ public T convert(TypeDescriptor typeDescriptor) {
283283
}
284284
}
285285

286+
protected StackManipulation shortCircuitReturnNull(
287+
StackManipulation readValue, StackManipulation onNotNull) {
288+
return new ShortCircuitReturnNull(readValue, onNotNull);
289+
}
290+
286291
protected abstract T convertArray(TypeDescriptor<?> type);
287292

288293
protected abstract T convertIterable(TypeDescriptor<?> type);

sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/SchemaTestUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ private static boolean fieldsEquivalent(Object expected, Object actual, FieldTyp
126126
(Map<Object, Object>) expected,
127127
(Map<Object, Object>) actual,
128128
fieldType.getMapValueType());
129+
} else if (fieldType.getTypeName() == TypeName.ROW) {
130+
return rowsEquivalent((Row) expected, (Row) actual);
129131
} else {
130132
return Objects.equals(expected, actual);
131133
}

sdks/java/extensions/avro/build.gradle

+88-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import java.util.stream.Collectors
2+
13
/*
24
* Licensed to the Apache Software Foundation (ASF) under one
35
* or more contributor license agreements. See the NOTICE file
@@ -27,6 +29,14 @@ applyAvroNature()
2729

2830
description = "Apache Beam :: SDKs :: Java :: Extensions :: Avro"
2931

32+
def avroVersions = [
33+
'192' : "1.9.2",
34+
'1102': "1.10.2",
35+
'1111': "1.11.1",
36+
]
37+
38+
avroVersions.each { k, v -> configurations.create("avroVersion$k") }
39+
3040
// Exclude tests that need a runner
3141
test {
3242
systemProperty "beamUseDummyRunner", "true"
@@ -38,19 +48,94 @@ test {
3848
dependencies {
3949
implementation library.java.byte_buddy
4050
implementation library.java.vendored_guava_26_0_jre
41-
implementation (project(path: ":sdks:java:core", configuration: "shadow")) {
51+
implementation(project(path: ":sdks:java:core", configuration: "shadow")) {
4252
// Exclude Avro dependencies from "core" since Avro support moved to this extension
4353
exclude group: "org.apache.avro", module: "avro"
4454
}
4555
implementation library.java.error_prone_annotations
4656
implementation library.java.avro
4757
implementation library.java.joda_time
48-
testImplementation (project(path: ":sdks:java:core", configuration: "shadowTest")) {
58+
testImplementation(project(path: ":sdks:java:core", configuration: "shadowTest")) {
4959
// Exclude Avro dependencies from "core" since Avro support moved to this extension
5060
exclude group: "org.apache.avro", module: "avro"
5161
}
5262
testImplementation library.java.avro_tests
5363
testImplementation library.java.junit
5464
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
5565
testRuntimeOnly library.java.slf4j_jdk14
56-
}
66+
avroVersions.each {
67+
"avroVersion$it.key" "org.apache.avro:avro:$it.value"
68+
"avroVersion$it.key" "org.apache.avro:avro-tools:$it.value"
69+
}
70+
}
71+
72+
avroVersions.each { kv ->
73+
configurations."avroVersion$kv.key" {
74+
resolutionStrategy {
75+
force "org.apache.avro:avro:$kv.value"
76+
}
77+
}
78+
79+
sourceSets {
80+
"avro${kv.key}" {
81+
java {
82+
srcDirs "build/generated/sources/avro${kv.key}/test/java"
83+
}
84+
85+
compileClasspath = configurations."avroVersion$kv.key" + sourceSets.test.output + sourceSets.test.compileClasspath
86+
runtimeClasspath += compileClasspath + sourceSets.test.runtimeClasspath
87+
}
88+
}
89+
90+
"compileAvro${kv.key}Java" {
91+
checkerFramework {
92+
skipCheckerFramework = true
93+
}
94+
}
95+
96+
"spotbugsAvro${kv.key}" {
97+
ignoreFailures = true
98+
}
99+
100+
"generateAvro${kv.key}AvroJava" {
101+
dependsOn "generateAvroClasses${kv.key}"
102+
}
103+
104+
task "avroVersion${kv.key}Test"(type: Test) {
105+
group = "Verification"
106+
description = "Runs Avro extension tests with Avro version $kv.value"
107+
outputs.upToDateWhen { false }
108+
classpath = sourceSets."avro${kv.key}".runtimeClasspath
109+
110+
include '**/*.class'
111+
exclude '**/AvroIOTest$NeedsRunnerTests$*.class'
112+
113+
dependsOn "generateAvroClasses${kv.key}"
114+
}
115+
116+
task "generateAvroClasses${kv.key}"(type: JavaExec) {
117+
group = "build"
118+
description = "Generate Avro classes for Avro version $kv.value"
119+
classpath = configurations."avroVersion$kv.key"
120+
main = "org.apache.avro.tool.Main"
121+
args = [
122+
"compile",
123+
"schema",
124+
"src/test/avro/org/apache/beam/sdk/extensions/avro/io/user.avsc",
125+
"src/test/avro/org/apache/beam/sdk/extensions/avro/schemas/test.avsc",
126+
"build/generated/sources/avro${kv.key}/test/java"
127+
]
128+
}
129+
}
130+
131+
task avroVersionsTest {
132+
group = "Verification"
133+
description = 'Runs Avro extension tests with different Avro API versions'
134+
dependsOn createTaskNames(avroVersions, "Test")
135+
}
136+
137+
static def createTaskNames(Map<String, String> prefixMap, String suffix) {
138+
return prefixMap.keySet().stream()
139+
.map { version -> "avroVersion${version}${suffix}" }
140+
.collect(Collectors.toList())
141+
}

sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ private Object readResolve() throws IOException, ClassNotFoundException {
256256
* Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize Java's
257257
* serialization and hence is able to encode the {@link Schema} object directly.
258258
*/
259-
private static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
259+
static class SerializableSchemaSupplier implements Serializable, Supplier<Schema> {
260260
// writeReplace makes this object serializable. This is a limitation of FindBugs as discussed
261261
// here:
262262
// http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern

sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java

+96-9
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,18 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Objects;
37+
import java.util.concurrent.TimeUnit;
3738
import java.util.stream.Collectors;
3839
import javax.annotation.Nonnull;
3940
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
41+
import net.bytebuddy.implementation.bytecode.Division;
4042
import net.bytebuddy.implementation.bytecode.Duplication;
43+
import net.bytebuddy.implementation.bytecode.Multiplication;
4144
import net.bytebuddy.implementation.bytecode.StackManipulation;
4245
import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
4346
import net.bytebuddy.implementation.bytecode.TypeCreation;
4447
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
48+
import net.bytebuddy.implementation.bytecode.constant.LongConstant;
4549
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
4650
import net.bytebuddy.matcher.ElementMatchers;
4751
import org.apache.avro.AvroRuntimeException;
@@ -144,13 +148,22 @@
144148
"rawtypes"
145149
})
146150
public class AvroUtils {
151+
147152
static {
148153
// This works around a bug in the Avro library (AVRO-1891) around SpecificRecord's handling
149154
// of DateTime types.
150155
SpecificData.get().addLogicalTypeConversion(new AvroCoder.JodaTimestampConversion());
151156
GenericData.get().addLogicalTypeConversion(new AvroCoder.JodaTimestampConversion());
152157
}
153158

159+
private static final ForLoadedType BYTES = new ForLoadedType(byte[].class);
160+
private static final ForLoadedType JAVA_INSTANT = new ForLoadedType(java.time.Instant.class);
161+
private static final ForLoadedType JAVA_LOCALE_DATE =
162+
new ForLoadedType(java.time.LocalDate.class);
163+
private static final ForLoadedType JODA_READABLE_INSTANT =
164+
new ForLoadedType(ReadableInstant.class);
165+
private static final ForLoadedType JODA_INSTANT = new ForLoadedType(Instant.class);
166+
154167
// Unwrap an AVRO schema into the base type an whether it is nullable.
155168
public static class TypeWithNullability {
156169
public final org.apache.avro.Schema type;
@@ -254,7 +267,10 @@ public AvroConvertType(boolean returnRawType) {
254267

255268
@Override
256269
protected java.lang.reflect.Type convertDefault(TypeDescriptor<?> type) {
257-
if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
270+
if (type.isSubtypeOf(TypeDescriptor.of(java.time.Instant.class))
271+
|| type.isSubtypeOf(TypeDescriptor.of(java.time.LocalDate.class))) {
272+
return convertDateTime(type);
273+
} else if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
258274
return byte[].class;
259275
} else {
260276
return super.convertDefault(type);
@@ -282,10 +298,46 @@ protected StackManipulation convertDefault(TypeDescriptor<?> type) {
282298
MethodInvocation.invoke(
283299
new ForLoadedType(GenericFixed.class)
284300
.getDeclaredMethods()
285-
.filter(
286-
ElementMatchers.named("bytes")
287-
.and(ElementMatchers.returns(new ForLoadedType(byte[].class))))
301+
.filter(ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTES)))
288302
.getOnly()));
303+
} else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
304+
// Generates the following code:
305+
// return Instant.ofEpochMilli(value.toEpochMilli())
306+
StackManipulation onNotNull =
307+
new Compound(
308+
readValue,
309+
MethodInvocation.invoke(
310+
JAVA_INSTANT
311+
.getDeclaredMethods()
312+
.filter(ElementMatchers.named("toEpochMilli"))
313+
.getOnly()),
314+
MethodInvocation.invoke(
315+
JODA_INSTANT
316+
.getDeclaredMethods()
317+
.filter(
318+
ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli")))
319+
.getOnly()));
320+
return shortCircuitReturnNull(readValue, onNotNull);
321+
} else if (java.time.LocalDate.class.isAssignableFrom(type.getRawType())) {
322+
// Generates the following code:
323+
// return Instant.ofEpochMilli(value.toEpochDay() * TimeUnit.DAYS.toMillis(1))
324+
StackManipulation onNotNull =
325+
new Compound(
326+
readValue,
327+
MethodInvocation.invoke(
328+
JAVA_LOCALE_DATE
329+
.getDeclaredMethods()
330+
.filter(ElementMatchers.named("toEpochDay"))
331+
.getOnly()),
332+
LongConstant.forValue(TimeUnit.DAYS.toMillis(1)),
333+
Multiplication.LONG,
334+
MethodInvocation.invoke(
335+
JODA_INSTANT
336+
.getDeclaredMethods()
337+
.filter(
338+
ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli")))
339+
.getOnly()));
340+
return shortCircuitReturnNull(readValue, onNotNull);
289341
}
290342
return super.convertDefault(type);
291343
}
@@ -303,25 +355,60 @@ protected TypeConversionsFactory getFactory() {
303355

304356
@Override
305357
protected StackManipulation convertDefault(TypeDescriptor<?> type) {
306-
final ForLoadedType byteArrayType = new ForLoadedType(byte[].class);
307358
if (type.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
308359
// Generate the following code:
309-
// return new T((byte[]) value);
360+
// return new T((byte[]) value);
310361
ForLoadedType loadedType = new ForLoadedType(type.getRawType());
311362
return new Compound(
312363
TypeCreation.of(loadedType),
313364
Duplication.SINGLE,
314365
// Load the parameter and cast it to a byte[].
315366
readValue,
316-
TypeCasting.to(byteArrayType),
367+
TypeCasting.to(BYTES),
317368
// Create a new instance that wraps this byte[].
318369
MethodInvocation.invoke(
319370
loadedType
320371
.getDeclaredMethods()
321372
.filter(
322-
ElementMatchers.isConstructor()
323-
.and(ElementMatchers.takesArguments(byteArrayType)))
373+
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(BYTES)))
324374
.getOnly()));
375+
} else if (java.time.Instant.class.isAssignableFrom(type.getRawType())) {
376+
// Generates the following code:
377+
// return java.time.Instant.ofEpochMilli(value.getMillis())
378+
StackManipulation onNotNull =
379+
new Compound(
380+
readValue,
381+
MethodInvocation.invoke(
382+
JODA_READABLE_INSTANT
383+
.getDeclaredMethods()
384+
.filter(ElementMatchers.named("getMillis"))
385+
.getOnly()),
386+
MethodInvocation.invoke(
387+
JAVA_INSTANT
388+
.getDeclaredMethods()
389+
.filter(
390+
ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochMilli")))
391+
.getOnly()));
392+
return shortCircuitReturnNull(readValue, onNotNull);
393+
} else if (java.time.LocalDate.class.isAssignableFrom(type.getRawType())) {
394+
// Generates the following code:
395+
// return java.time.LocalDate.ofEpochDay(value.getMillis() / TimeUnit.DAYS.toMillis(1))
396+
StackManipulation onNotNull =
397+
new Compound(
398+
readValue,
399+
MethodInvocation.invoke(
400+
JODA_READABLE_INSTANT
401+
.getDeclaredMethods()
402+
.filter(ElementMatchers.named("getMillis"))
403+
.getOnly()),
404+
LongConstant.forValue(TimeUnit.DAYS.toMillis(1)),
405+
Division.LONG,
406+
MethodInvocation.invoke(
407+
JAVA_LOCALE_DATE
408+
.getDeclaredMethods()
409+
.filter(ElementMatchers.isStatic().and(ElementMatchers.named("ofEpochDay")))
410+
.getOnly()));
411+
return shortCircuitReturnNull(readValue, onNotNull);
325412
}
326413
return super.convertDefault(type);
327414
}

0 commit comments

Comments
 (0)