Skip to content

Commit 1ea012b

Browse files
sandip-dbvpolet
authored and
vpolet
committed
[SPARK-44751][SQL] XML FileFormat Interface implementation
### What changes were proposed in this pull request? This is the second PR related to the built-in XML data source implementation ([jira](https://issues.apache.org/jira/browse/SPARK-44751)). The previous [PR](apache#41832) ported the spark-xml package. This PR addresses the following: - Implement FileFormat interface - Address the review comments in the previous [XML PR](apache#41832) - Moved from_xml and schema_of_xml to sql/functions - Moved ".xml" to DataFrameReader/DataFrameWriter - Removed old APIs like XmlRelation, XmlReader, etc. - StaxXmlParser changes: - Use FailureSafeParser - Convert 'Row' usage to 'InternalRow' - Convert String to UTF8String - Handle MapData and ArrayData for MapType and ArrayType respectively - Use TimestampFormatter to parse timestamp - Use DateFormatter to parse date - StaxXmlGenerator changes: - Convert 'Row' usage to 'InternalRow' - Handle UTF8String for StringType - Handle MapData and ArrayData for MapType and ArrayType respectively - Use TimestampFormatter to format timestamp - Use DateFormatter to format date - Update XML tests accordingly because of the above changes ### Why are the changes needed? These changes are required to bring XML data source capability at par with CSV and JSON and supports features like streaming, which requires FileFormat interface to be implemented. ### Does this PR introduce _any_ user-facing change? Yes, it adds support for XML data source. ### How was this patch tested? - Ran all the XML unit tests. - Github Action Closes apache#42462 from sandip-db/xml-file-format-master. Authored-by: Sandip Agarwala <131817656+sandip-db@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent d4a5e8a commit 1ea012b

File tree

50 files changed

+2678
-1822
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2678
-1822
lines changed

common/utils/src/main/resources/error/error-classes.json

+5
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,11 @@
579579
"<errors>"
580580
]
581581
},
582+
"INVALID_XML_MAP_KEY_TYPE" : {
583+
"message" : [
584+
"Input schema <schema> can only contain STRING as a key type for a MAP."
585+
]
586+
},
582587
"IN_SUBQUERY_DATA_TYPE_MISMATCH" : {
583588
"message" : [
584589
"The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [<mismatchedColumns>], left side: [<leftType>], right side: [<rightType>]."

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

+40
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
392392
def csv(csvDataset: Dataset[String]): DataFrame =
393393
parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
394394

395+
/**
396+
* Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
397+
* overloaded `xml()` method for more details.
398+
*
399+
* @since 4.0.0
400+
*/
401+
def xml(path: String): DataFrame = {
402+
// This method ensures that calls that explicit need single argument works, see SPARK-16009
403+
xml(Seq(path): _*)
404+
}
405+
406+
/**
407+
* Loads XML files and returns the result as a `DataFrame`.
408+
*
409+
* This function will go through the input once to determine the input schema if `inferSchema`
410+
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
411+
* specify the schema explicitly using `schema`.
412+
*
413+
* You can find the XML-specific options for reading XML files in <a
414+
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
415+
* Data Source Option</a> in the version you use.
416+
*
417+
* @since 4.0.0
418+
*/
419+
@scala.annotation.varargs
420+
def xml(paths: String*): DataFrame = format("xml").load(paths: _*)
421+
422+
/**
423+
* Loads an `Dataset[String]` storing XML object and returns the result as a `DataFrame`.
424+
*
425+
* If the schema is not specified using `schema` function and `inferSchema` option is enabled,
426+
* this function goes through the input once to determine the input schema.
427+
*
428+
* @param xmlDataset
429+
* input Dataset with one XML object per record
430+
* @since 4.0.0
431+
*/
432+
def xml(xmlDataset: Dataset[String]): DataFrame =
433+
parse(xmlDataset, ParseFormat.PARSE_FORMAT_UNSPECIFIED)
434+
395435
/**
396436
* Loads a Parquet file, returning the result as a `DataFrame`. See the documentation on the
397437
* other overloaded `parquet()` method for more details.

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

+28
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
470470
format("csv").save(path)
471471
}
472472

473+
/**
474+
* Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
475+
* to:
476+
* {{{
477+
* format("xml").save(path)
478+
* }}}
479+
*
480+
* Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
481+
* `ArrayType` would have an additional nested field for the element. For example, the
482+
* `DataFrame` having a field below,
483+
*
484+
* {@code fieldA [[data1], [data2]]}
485+
*
486+
* would produce a XML file below. { @code <fieldA> <item>data1</item> </fieldA> <fieldA>
487+
* <item>data2</item> </fieldA>}
488+
*
489+
* Namely, roundtrip in writing and reading can end up in different schema structure.
490+
*
491+
* You can find the XML-specific options for writing XML files in <a
492+
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
493+
* Data Source Option</a> in the version you use.
494+
*
495+
* @since 4.0.0
496+
*/
497+
def xml(path: String): Unit = {
498+
format("xml").save(path)
499+
}
500+
473501
///////////////////////////////////////////////////////////////////////////////////////
474502
// Builder pattern config options
475503
///////////////////////////////////////////////////////////////////////////////////////

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala

+106
Original file line numberDiff line numberDiff line change
@@ -7227,6 +7227,112 @@ object functions {
72277227
*/
72287228
def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
72297229

7230+
// scalastyle:off line.size.limit
7231+
/**
7232+
* Parses a column containing a XML string into the data type corresponding to the specified
7233+
* schema. Returns `null`, in the case of an unparseable string.
7234+
*
7235+
* @param e
7236+
* a string column containing XML data.
7237+
* @param schema
7238+
* the schema to use when parsing the XML string
7239+
* @param options
7240+
* options to control how the XML is parsed. accepts the same options and the XML data source.
7241+
* See <a href=
7242+
* "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
7243+
* Source Option</a> in the version you use.
7244+
* @group collection_funcs
7245+
*
7246+
* @since 4.0.0
7247+
*/
7248+
// scalastyle:on line.size.limit
7249+
def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
7250+
from_xml(e, lit(schema.toDDL), options.iterator)
7251+
7252+
// scalastyle:off line.size.limit
7253+
7254+
/**
7255+
* (Java-specific) Parses a column containing a XML string into the data type corresponding to
7256+
* the specified schema. Returns `null`, in the case of an unparseable string.
7257+
*
7258+
* @param e
7259+
* a string column containing XML data.
7260+
* @param schema
7261+
* the schema to use when parsing the XML string
7262+
* @param options
7263+
* options to control how the XML is parsed. accepts the same options and the XML data source.
7264+
* See <a href=
7265+
* "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
7266+
* Source Option</a> in the version you use.
7267+
* @group collection_funcs
7268+
*
7269+
* @since 4.0.0
7270+
*/
7271+
// scalastyle:on line.size.limit
7272+
def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
7273+
from_xml(e, schema, options.asScala.iterator)
7274+
7275+
/**
7276+
* Parses a column containing a XML string into the data type corresponding to the specified
7277+
* schema. Returns `null`, in the case of an unparseable string.
7278+
*
7279+
* @param e
7280+
* a string column containing XML data.
7281+
* @param schema
7282+
* the schema to use when parsing the XML string
7283+
* @group collection_funcs
7284+
*
7285+
* @since 4.0.0
7286+
*/
7287+
def from_xml(e: Column, schema: StructType): Column =
7288+
from_xml(e, schema, Map.empty[String, String])
7289+
7290+
private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
7291+
fnWithOptions("from_xml", options, e, schema)
7292+
}
7293+
7294+
/**
7295+
* Parses a XML string and infers its schema in DDL format.
7296+
*
7297+
* @param xml
7298+
* a XML string.
7299+
* @group collection_funcs
7300+
* @since 4.0.0
7301+
*/
7302+
def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
7303+
7304+
/**
7305+
* Parses a XML string and infers its schema in DDL format.
7306+
*
7307+
* @param xml
7308+
* a foldable string column containing a XML string.
7309+
* @group collection_funcs
7310+
* @since 4.0.0
7311+
*/
7312+
def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml)
7313+
7314+
// scalastyle:off line.size.limit
7315+
7316+
/**
7317+
* Parses a XML string and infers its schema in DDL format using options.
7318+
*
7319+
* @param xml
7320+
* a foldable string column containing XML data.
7321+
* @param options
7322+
* options to control how the xml is parsed. accepts the same options and the XML data source.
7323+
* See <a href=
7324+
* "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
7325+
* Source Option</a> in the version you use.
7326+
* @return
7327+
* a column with string literal containing schema in DDL format.
7328+
* @group collection_funcs
7329+
* @since 4.0.0
7330+
*/
7331+
// scalastyle:on line.size.limit
7332+
def schema_of_xml(xml: Column, options: java.util.Map[String, String]): Column = {
7333+
fnWithOptions("schema_of_xml", options.asScala.iterator, xml)
7334+
}
7335+
72307336
/**
72317337
* Returns the total number of elements in the array. The function returns null for null input.
72327338
*

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

+18
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,24 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
189189
*/
190190
def csv(path: String): DataFrame = format("csv").load(path)
191191

192+
/**
193+
* Loads a XML file stream and returns the result as a `DataFrame`.
194+
*
195+
* This function will go through the input once to determine the input schema if `inferSchema`
196+
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
197+
* specify the schema explicitly using `schema`.
198+
*
199+
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
200+
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
201+
*
202+
* You can find the XML-specific options for reading XML file stream in <a
203+
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
204+
* Data Source Option</a> in the version you use.
205+
*
206+
* @since 4.0.0
207+
*/
208+
def xml(path: String): DataFrame = format("xml").load(path)
209+
192210
/**
193211
* Loads a ORC file stream, returning the result as a `DataFrame`.
194212
*

docs/sql-error-conditions-datatype-mismatch-error-class.md

+4
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ The `<functionName>` does not support ordering on type `<dataType>`.
123123

124124
`<errors>`
125125

126+
## INVALID_XML_MAP_KEY_TYPE
127+
128+
Input schema `<schema>` can only contain STRING as a key type for a MAP.
129+
126130
## IN_SUBQUERY_DATA_TYPE_MISMATCH
127131

128132
The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [`<mismatchedColumns>`], left side: [`<leftType>`], right side: [`<rightType>`].

python/pyspark/sql/tests/test_functions.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,12 @@ def test_function_parity(self):
8181
missing_in_py = jvm_fn_set.difference(py_fn_set)
8282

8383
# Functions that we expect to be missing in python until they are added to pyspark
84-
expected_missing_in_py = set()
84+
expected_missing_in_py = {
85+
# TODO: XML functions will soon be added and removed from this list
86+
# https://issues.apache.org/jira/browse/SPARK-44788
87+
"from_xml",
88+
"schema_of_xml",
89+
}
8590

8691
self.assertEqual(
8792
expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected"

sql/catalyst/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@
111111
<artifactId>univocity-parsers</artifactId>
112112
<type>jar</type>
113113
</dependency>
114+
<dependency>
115+
<groupId>org.apache.ws.xmlschema</groupId>
116+
<artifactId>xmlschema-core</artifactId>
117+
</dependency>
114118
<dependency>
115119
<groupId>org.apache.datasketches</groupId>
116120
<artifactId>datasketches-java</artifactId>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala

+23
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,27 @@ object ExprUtils extends QueryErrorsBase {
117117
TypeCheckSuccess
118118
}
119119
}
120+
121+
/**
122+
* Check if the schema is valid for XML
123+
*
124+
* @param schema The schema to check.
125+
* @return
126+
* `TypeCheckSuccess` if the schema is valid
127+
* `DataTypeMismatch` with an error error if the schema is not valid
128+
*/
129+
def checkXmlSchema(schema: DataType): TypeCheckResult = {
130+
val isInvalid = schema.existsRecursively {
131+
// XML field names must be StringType
132+
case MapType(keyType, _, _) if keyType != StringType => true
133+
case _ => false
134+
}
135+
if (isInvalid) {
136+
DataTypeMismatch(
137+
errorSubClass = "INVALID_XML_MAP_KEY_TYPE",
138+
messageParameters = Map("schema" -> toSQLType(schema)))
139+
} else {
140+
TypeCheckSuccess
141+
}
142+
}
120143
}

0 commit comments

Comments
 (0)