Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job failed with java.lang.NoSuchMethodError on AWS Glue 4.0 #363

Open
xmubeta opened this issue Sep 10, 2024 · 0 comments
Open

Job failed with java.lang.NoSuchMethodError on AWS Glue 4.0 #363

xmubeta opened this issue Sep 10, 2024 · 0 comments

Comments

@xmubeta
Copy link

xmubeta commented Sep 10, 2024

Hello,

I am doing a data pipeline: MySQL CDC -> Kafka Connect (Debezium + Confluence schema registry) -> Kafka -> AWS Glue -> S3 (hudi format). While using Abris to read data with schema registry, I got the following error:

java.lang.NoSuchMethodError: io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.getSchemaById(I)Lio/confluent/kafka/schemaregistry/ParsedSchema;
	at za.co.absa.abris.avro.registry.AbstractConfluentRegistryClient.getById(AbstractConfluentRegistryClient.scala:44) ~[abris_2.12-6.4.1.jar:?]
	at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaById(SchemaManager.scala:49) ~[abris_2.12-6.4.1.jar:?]
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.$anonfun$downloadWriterSchema$1(AvroDataToCatalyst.scala:160) ~[abris_2.12-6.4.1.jar:?]
	at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.downloadWriterSchema(AvroDataToCatalyst.scala:160) ~[abris_2.12-6.4.1.jar:?]
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.$anonfun$decodeConfluentAvro$1(AvroDataToCatalyst.scala:151) ~[abris_2.12-6.4.1.jar:?]
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) ~[scala-library-2.12.15.jar:?]
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decodeConfluentAvro(AvroDataToCatalyst.scala:150) ~[abris_2.12-6.4.1.jar:?]
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decode(AvroDataToCatalyst.scala:132) ~[abris_2.12-6.4.1.jar:?]
	at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:79) ~[abris_2.12-6.4.1.jar:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:968) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at scala.collection.Iterator.isEmpty(Iterator.scala:387) ~[scala-library-2.12.15.jar:?]
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.isEmpty(WholeStageCodegenExec.scala:966) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:102) ~[hudi-spark3-bundle_2.12-0.12.1.jar:0.12.1]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.sql.execution.SQLConfInjectingRDD.compute(SQLConfInjectingRDD.scala:58) ~[hudi-spark3-bundle_2.12-0.12.1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1517) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_412]

Environment: AWS Glue 4.0
External jars: kafka-schema-registry-client-6.2.1.jar,kafka-avro-serializer-6.2.1.jar,abris_2.12-6.4.0.jar,commons_2.12-1.0.0.jar
Abris version: Tried both 6.4.0 and 6.4.1

Test code:

package com.nwcd.beta

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.config.AbrisConfig

object KafkaToHudiOnGlue {



  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("KafkaToHudiS3WithSchemaRegistry")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.hive.convertMetastoreParquet", "false")
      .config("enableHiveSupport","true")
      .config("spark.hadoop.hive.metastore.client.factory.class","com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
      .master("local[*]")
      .getOrCreate()

    val kafkaBootstrapServers = "172.31.1.22:9092"
    val kafkaTopic = db.example1"
    val schemaRegistryUrl = "http://172.31.1.22:8081"
    // S3 configuration
    val tableName = "hudi_example1"
    val s3BucketPath = s"s3a://s3bucket/glue/streaming/$tableName/"
    val checkpointLocation = s"s3a://s3bucket/glue/checkpoint/$tableName/"

    // Read from Kafka with Schema Registry
    val kafkaStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "earliest")
      .load()


  
    // Parse the Avro data using Schema Registry
    val abrisConfig = AbrisConfig
      .fromConfluentAvro
      .downloadReaderSchemaByLatestVersion
      .andTopicNameStrategy(kafkaTopic, isKey = false)
      .usingSchemaRegistry(schemaRegistryUrl)


    val parsedStream = kafkaStream.select(
      from_avro(col("value"), abrisConfig) as 'data
    ).select("data.*")

    // Extract the 'after' data and operation type
    val transformedStream = parsedStream.select(
      col("after.*"),
      col("op").alias("operation")
    ).filter(col("customerId").isNotNull)
	
    transformedStream.printSchema()
    
    // Write to Hudi
    val query = transformedStream.writeStream
      .format("hudi")
      .option("hoodie.table.name", tableName)
      .option("hoodie.table.type","COPY_ON_WRITE")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.recordkey.field", "customerId")
      .option("hoodie.datasource.write.precombine.field", "ts")
      .option("hoodie.datasource.write.partitionpath.field", "customerCity")
      .option("hoodie.datasource.write.hive_style_partitioning","true")
      .option("hoodie.datasource.hive_sync.mode","hms")
      .option("hoodie.datasource.hive_sync.use_jdbc","false")
      .option("hoodie.datasource.hive_sync.database","default")
      .option("hoodie.datasource.hive_sync.table", tableName)
      .option("hoodie.datasource.hive_sync.enable", "true")
      .option("hoodie.datasource.hive_sync.partition_fields", "customerCity")
      .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor")
      .option("hoodie.datasource.hive_sync.support_timestamp", "true")
      .option("hoodie.datasource.hive_sync.use_table_location", "true")
      .option("checkpointLocation", checkpointLocation)
      .trigger(Trigger.ProcessingTime("1 minutes"))
      .outputMode("append")
      .start(s3BucketPath)

    query.awaitTermination()

  }
}

I found other people use CachedSchemaRegistryClient, but here it is not called. Please help me. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant