Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44751][SQL] XML FileFormat Interface implementation #42462

Closed
wants to merge 12 commits into from
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,11 @@
"<errors>"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I was thinking upgrading the xml reader with data source v2 before but really stopped by the refactoring work involved. Thanks for adding it into the spark mainline to unify the interfaces and catch up with the main changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're welcome! This is great!

]
},
"INVALID_XML_MAP_KEY_TYPE" : {
"message" : [
"Input schema <schema> can only contain STRING as a key type for a MAP."
]
},
"IN_SUBQUERY_DATA_TYPE_MISMATCH" : {
"message" : [
"The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [<mismatchedColumns>], left side: [<leftType>], right side: [<rightType>]."
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-datatype-mismatch-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ The `<functionName>` does not support ordering on type `<dataType>`.

`<errors>`

## INVALID_XML_MAP_KEY_TYPE

Input schema `<schema>` can only contain STRING as a key type for a MAP.

## IN_SUBQUERY_DATA_TYPE_MISMATCH

The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [`<mismatchedColumns>`], left side: [`<leftType>`], right side: [`<rightType>`].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,26 @@ object ExprUtils extends QueryErrorsBase {
TypeCheckSuccess
}
}

/**
* Check if the schema is valid for XML
*
* @param schema The schema to check.
* @return
* `TypeCheckSuccess` if the schema is valid
* `DataTypeMismatch` with an error error if the schema is not valid
*/
def checkXmlSchema(schema: DataType): TypeCheckResult = {
val isInvalid = schema.existsRecursively {
case MapType(keyType, _, _) if keyType != StringType => true
case _ => false
}
if (isInvalid) {
DataTypeMismatch(
errorSubClass = "INVALID_XML_MAP_KEY_TYPE",
messageParameters = Map("schema" -> toSQLType(schema)))
} else {
TypeCheckSuccess
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3277,6 +3277,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("jsonSchema" -> toSQLType(schema)))
}

def invalidXmlSchema(schema: DataType): Throwable = {
new AnalysisException(
errorClass = "INVALID_XML_SCHEMA_MAP_TYPE",
messageParameters = Map("xmlSchema" -> toSQLType(schema)))
}

def tableIndexNotSupportedError(errorMessage: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1332",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ org.apache.spark.sql.execution.datasources.noop.NoopDataSource
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
org.apache.spark.sql.execution.datasources.xml.DefaultSource
org.apache.spark.sql.execution.datasources.xml.XMLFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
Expand Down
82 changes: 82 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.datasources.xml.{TextInputXMLDataSource, XmlOptions}
import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
import org.apache.spark.sql.execution.datasources.xml.util.XmlUtils.checkXmlSchema
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -537,6 +540,85 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
@scala.annotation.varargs
def csv(paths: String*): DataFrame = format("csv").load(paths : _*)

/**
* Loads a XML file and returns the result as a `DataFrame`. See the documentation on the
* other overloaded `xml()` method for more details.
*
*/
def xml(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
xml(Seq(path): _*)
}

/**
* Loads XML files and returns the result as a `DataFrame`.
*
* This function will go through the input once to determine the input schema if `inferSchema`
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
* specify the schema explicitly using `schema`.
*
* You can find the XML-specific options for reading XML files in
* <a href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
* Data Source Option</a> in the version you use.
*
*/
@scala.annotation.varargs
def xml(paths: String*): DataFrame = format("xml").load(paths: _*)

/**
* Loads an `RDD[String]` storing XML objects and returns the result as a `DataFrame`.
*
* Unless the schema is specified using `schema` function, this function goes through the
* input once to determine the input schema.
*
* @param xmlRDD input RDD with one XML object per record
* @since
*/
// @deprecated("Use xml(Dataset[String]) instead.", "2.2.0")
def xml(xmlRDD: RDD[String]): DataFrame = {
xml(sparkSession.createDataset(xmlRDD)(Encoders.STRING))
}

/**
* Loads an `Dataset[String]` storing XML object and returns the result as a `DataFrame`.
*
* If the schema is not specified using `schema` function and `inferSchema` option is enabled,
* this function goes through the input once to determine the input schema.
*
* @param xmlDataset input Dataset with one XML object per record
*/
def xml(xmlDataset: Dataset[String]): DataFrame = {
val parsedOptions: XmlOptions = new XmlOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

userSpecifiedSchema.foreach(checkXmlSchema)

val schema = userSpecifiedSchema.map {
case s if !SQLConf.get.getConf(
SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION) => s.asNullable
case other => other
}.getOrElse {
TextInputXMLDataSource.inferFromDataset(xmlDataset, parsedOptions)
}

ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

val parsed = xmlDataset.rdd.mapPartitions { iter =>
val rawParser = new StaxXmlParser(actualSchema, parsedOptions)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = xmlDataset.isStreaming)
}

/**
* Loads a Parquet file, returning the result as a `DataFrame`. See the documentation
* on the other overloaded `parquet()` method for more details.
Expand Down
32 changes: 32 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,38 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
format("csv").save(path)
}

/**
* Saves the content of the `DataFrame` in XML format at the specified path.
* This is equivalent to:
* {{{
* format("xml").save(path)
* }}}
*
* Note that writing a XML file from [[DataFrame]] having a field [[ArrayType]] with
* its element as [[ArrayType]] would have an additional nested field for the element.
* For example, the [[DataFrame]] having a field below,
*
* fieldA [[data1, data2]]
*
* would produce a XML file below.
*
* <fieldA>
* <item>data1</item>
* </fieldA>
* <fieldA>
* <item>data2</item>
* </fieldA>
*
* Namely, roundtrip in writing and reading can end up in different schema structure.
*
* You can find the XML-specific options for writing XML files in
* <a href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
* Data Source Option</a> in the version you use.
*/
def xml(path: String): Unit = {
format("xml").save(path)
}

/**
* Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.execution.datasources.xml.XMLFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -567,6 +568,7 @@ object DataSource extends Logging {
private val backwardCompatibilityMap: Map[String, String] = {
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val xml = classOf[XMLFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
Expand Down Expand Up @@ -595,6 +597,8 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"com.databricks.spark.xml" -> xml,
"org.apache.spark.sql.execution.datasources.xml" -> xml,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
csv.flatMap { lines =>
val path = new Path(lines.getPath())
UnivocityParser.tokenizeStream(
Expand Down

This file was deleted.

Loading