Skip to content

Commit 3ca3319

Browse files
committed
Abris #246 fix profile for Spark 2.3
Abris #246 code review changes II
1 parent 088b61b commit 3ca3319

File tree

5 files changed

+40
-19
lines changed

5 files changed

+40
-19
lines changed

pom.xml

+5-1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969

7070
<!--Platforms-->
7171
<spark.version>${spark-24.version}</spark.version>
72+
<spark.avro.version>${spark.version}</spark.avro.version>
7273
<kafka.spark.version>0-10</kafka.spark.version>
7374
<confluent.version>5.3.4</confluent.version>
7475

@@ -159,7 +160,7 @@
159160
<dependency>
160161
<groupId>org.apache.spark</groupId>
161162
<artifactId>spark-avro_${scala.compat.version}</artifactId>
162-
<version>${spark.version}</version>
163+
<version>${spark.avro.version}</version>
163164
</dependency>
164165

165166
<!-- Avro -->
@@ -334,6 +335,9 @@
334335
<id>spark-2.3</id>
335336
<properties>
336337
<spark.version>${spark-23.version}</spark.version>
338+
<spark.avro.version>${spark-24.version}</spark.avro.version>
339+
<avro.version>1.8.2</avro.version>
340+
<confluent.version>5.3.4</confluent.version>
337341
</properties>
338342
</profile>
339343
<profile>

src/main/scala/org/apache/spark/sql/avro/AbrisAvroDeserializer.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ class AbrisAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
3232
val clazz = classOf[AvroDeserializer]
3333
Try {
3434
clazz.getConstructor(classOf[Schema], classOf[DataType])
35-
.newInstance(rootAvroType, rootCatalystType)
35+
.newInstance(rootAvroType, rootCatalystType) // Spark 2.4 -
3636
}.recover { case _: NoSuchMethodException =>
3737
clazz.getConstructor(classOf[Schema], classOf[DataType], classOf[String])
38-
.newInstance(rootAvroType, rootCatalystType, "LEGACY")
38+
.newInstance(rootAvroType, rootCatalystType, "LEGACY") // Spark 3.0 +
3939
}
4040
.get
4141
.asInstanceOf[AvroDeserializer]
@@ -49,8 +49,8 @@ class AbrisAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
4949

5050
def deserialize(data: Any): Any = {
5151
deserializeMethod(data) match {
52-
case Some(x) => x
53-
case x => x
52+
case Some(x) => x // Spark 3.1 +
53+
case x => x // Spark 3.0 -
5454
}
5555
}
5656

src/main/scala/za/co/absa/abris/avro/read/confluent/SchemaManagerFactory.scala

+12-6
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package za.co.absa.abris.avro.read.confluent
1919

2020
import org.apache.spark.internal.Logging
21-
import za.co.absa.abris.avro.registry.{AbrisRegistryClient, ConfluentRegistryClient}
21+
import za.co.absa.abris.avro.registry.{AbrisRegistryClient, ConfluentMockRegistryClient, ConfluentRegistryClient}
2222
import za.co.absa.abris.config.AbrisConfig
2323
import za.co.absa.commons.annotation.DeveloperApi
2424

2525
import scala.collection.concurrent
26+
import scala.util.Try
2627
import scala.util.control.NonFatal
2728

2829
/**
@@ -53,16 +54,21 @@ object SchemaManagerFactory extends Logging {
5354
try {
5455
val clazz = Class.forName(configs(AbrisConfig.REGISTRY_CLIENT_CLASS))
5556
logInfo(msg = s"Configuring new Schema Registry client of type '${clazz.getCanonicalName}'")
56-
clazz
57-
.getDeclaredConstructor(classOf[Map[String, String]])
58-
.newInstance(configs)
57+
Try(clazz.getConstructor(classOf[Map[String, String]]).newInstance(configs))
58+
.recover { case _: NoSuchMethodException =>
59+
clazz.getConstructor().newInstance()
60+
}
61+
.get
5962
.asInstanceOf[AbrisRegistryClient]
6063
} catch {
6164
case e if NonFatal(e) =>
6265
throw new IllegalArgumentException("Custom registry client must implement AbrisRegistryClient " +
63-
"and have constructor accepting Map[String, String]", e)
66+
"and have parameterless or Map[String, String] accepting constructor", e)
6467
}
65-
} else {
68+
} else if (configs(AbrisConfig.SCHEMA_REGISTRY_URL).startsWith("mock://")) {
69+
logInfo(msg = s"Configuring new Schema Registry client of type ConfluentMockRegistryClient")
70+
new ConfluentMockRegistryClient()
71+
} else {
6672
logInfo(msg = s"Configuring new Schema Registry client of type ConfluentRegistryClient")
6773
new ConfluentRegistryClient(configs)
6874
}

src/main/scala/za/co/absa/abris/avro/registry/ConfluentMockRegistryClient.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,16 @@
1717
package za.co.absa.abris.avro.registry
1818

1919
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
20-
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata}
20+
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient}
2121
import org.apache.avro.Schema
2222

2323
import java.io.IOException
2424
import java.util
2525

2626

27-
class ConfluentMockRegistryClient extends AbrisRegistryClient {
28-
29-
private val client = new MockSchemaRegistryClient()
27+
class ConfluentMockRegistryClient(client: SchemaRegistryClient) extends AbrisRegistryClient {
3028

29+
def this() = this(new MockSchemaRegistryClient())
3130

3231
override def getAllVersions(subject: String): util.List[Integer] =
3332
client.getAllVersions(subject)
@@ -44,10 +43,10 @@ class ConfluentMockRegistryClient extends AbrisRegistryClient {
4443
@throws[IOException]
4544
@throws[RestClientException]
4645
override def getLatestSchemaMetadata(subject: String): SchemaMetadata = {
47-
try (client.getLatestSchemaMetadata(subject))
46+
try client.getLatestSchemaMetadata(subject)
4847
catch {
49-
case e: IOException if e.getMessage == "No schema registered under subject!"
50-
=> throw new RestClientException("No schema registered under subject!", 404, 40401)
48+
case e: IOException if e.getMessage == "No schema registered under subject!" =>
49+
throw new RestClientException("No schema registered under subject!", 404, 40401)
5150
}
5251
}
5352

src/test/scala/za/co/absa/abris/avro/read/confluent/SchemaManagerFactorySpec.scala

+13-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package za.co.absa.abris.avro.read.confluent
1818

1919
import org.scalatest.{BeforeAndAfterEach, FlatSpec}
20-
import za.co.absa.abris.avro.registry.{AbrisRegistryClient, ConfluentRegistryClient, TestRegistryClient}
20+
import za.co.absa.abris.avro.registry.{AbrisRegistryClient, ConfluentMockRegistryClient, ConfluentRegistryClient, TestRegistryClient}
2121
import za.co.absa.abris.config.AbrisConfig
2222

2323
import scala.reflect.runtime.{universe => ru}
@@ -78,4 +78,16 @@ class SchemaManagerFactorySpec extends FlatSpec with BeforeAndAfterEach {
7878
assert(res1.isInstanceOf[ConfluentRegistryClient])
7979
assert(res3.isInstanceOf[TestRegistryClient])
8080
}
81+
82+
it should "create mock client when url starts with mock://" in {
83+
val config = Map(AbrisConfig.SCHEMA_REGISTRY_URL -> "mock://dummy_sr_2")
84+
85+
val schemaManagerRef1 = SchemaManagerFactory.create(config)
86+
87+
val m = ru.runtimeMirror(schemaManagerRef1.getClass.getClassLoader)
88+
val fieldTerm = ru.typeOf[SchemaManager].decl(ru.TermName("schemaRegistryClient")).asTerm
89+
90+
val res1 = m.reflect(schemaManagerRef1).reflectField(fieldTerm).get.asInstanceOf[AbrisRegistryClient]
91+
assert(res1.isInstanceOf[ConfluentMockRegistryClient])
92+
}
8193
}

0 commit comments

Comments
 (0)