Skip to content

Commit

Permalink
Abris #246 code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
cerveada committed Nov 15, 2021
1 parent 76cd11b commit 44a56dd
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class AbrisAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
val clazz = classOf[AvroDeserializer]
Try {
clazz.getConstructor(classOf[Schema], classOf[DataType])
.newInstance(rootAvroType, rootCatalystType)
.newInstance(rootAvroType, rootCatalystType) // Spark 2.4 -
}.recover { case _: NoSuchMethodException =>
clazz.getConstructor(classOf[Schema], classOf[DataType], classOf[String])
.newInstance(rootAvroType, rootCatalystType, "LEGACY")
.newInstance(rootAvroType, rootCatalystType, "LEGACY") // Spark 3.0 +
}
.get
.asInstanceOf[AvroDeserializer]
Expand All @@ -49,8 +49,8 @@ class AbrisAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {

def deserialize(data: Any): Any = {
deserializeMethod(data) match {
case Some(x) => x
case x => x
case Some(x) => x // Spark 3.1 +
case x => x // Spark 3.0 -
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import za.co.absa.abris.config.AbrisConfig
import za.co.absa.commons.annotation.DeveloperApi

import scala.collection.concurrent
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -53,14 +54,16 @@ object SchemaManagerFactory extends Logging {
try {
val clazz = Class.forName(configs(AbrisConfig.REGISTRY_CLIENT_CLASS))
logInfo(msg = s"Configuring new Schema Registry client of type '${clazz.getCanonicalName}'")
clazz
.getDeclaredConstructor(classOf[Map[String, String]])
.newInstance(configs)
Try(clazz.getConstructor(classOf[Map[String, String]]).newInstance(configs))
.recover { case _: NoSuchMethodException =>
clazz.getConstructor().newInstance()
}
.get
.asInstanceOf[AbrisRegistryClient]
} catch {
case e if NonFatal(e) =>
throw new IllegalArgumentException("Custom registry client must implement AbrisRegistryClient " +
"and have constructor accepting Map[String, String]", e)
"and have parameterless or Map[String, String] accepting constructor", e)
}
} else {
logInfo(msg = s"Configuring new Schema Registry client of type ConfluentRegistryClient")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package za.co.absa.abris.avro.registry

import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata}
import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaMetadata, SchemaRegistryClient}
import org.apache.avro.Schema

import java.io.IOException
import java.util


class ConfluentMockRegistryClient extends AbrisRegistryClient {

private val client = new MockSchemaRegistryClient()
class ConfluentMockRegistryClient(client: SchemaRegistryClient) extends AbrisRegistryClient {

def this() = this(new MockSchemaRegistryClient())

override def getAllVersions(subject: String): util.List[Integer] =
client.getAllVersions(subject)
Expand All @@ -44,10 +43,10 @@ class ConfluentMockRegistryClient extends AbrisRegistryClient {
@throws[IOException]
@throws[RestClientException]
override def getLatestSchemaMetadata(subject: String): SchemaMetadata = {
try (client.getLatestSchemaMetadata(subject))
try client.getLatestSchemaMetadata(subject)
catch {
case e: IOException if e.getMessage == "No schema registered under subject!"
=> throw new RestClientException("No schema registered under subject!", 404, 40401)
case e: IOException if e.getMessage == "No schema registered under subject!" =>
throw new RestClientException("No schema registered under subject!", 404, 40401)
}
}

Expand Down

0 comments on commit 44a56dd

Please sign in to comment.