Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-44751][SQL] XML FileFormat Interface implementation #42462

Closed
wants to merge 12 commits into from
Closed
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,11 @@
"<errors>"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I was thinking upgrading the xml reader with data source v2 before but really stopped by the refactoring work involved. Thanks for adding it into the spark mainline to unify the interfaces and catch up with the main changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing this PR.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're welcome! This is great!

]
},
"INVALID_XML_MAP_KEY_TYPE" : {
"message" : [
"Input schema <schema> can only contain STRING as a key type for a MAP."
]
},
"IN_SUBQUERY_DATA_TYPE_MISMATCH" : {
"message" : [
"The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [<mismatchedColumns>], left side: [<leftType>], right side: [<rightType>]."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
def csv(csvDataset: Dataset[String]): DataFrame =
parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)

/**
* Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
* overloaded `xml()` method for more details.
*
* @since 4.0.0
*/
def xml(path: String): DataFrame = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// This method ensures that calls that explicit need single argument works, see SPARK-16009
xml(Seq(path): _*)
}

/**
* Loads XML files and returns the result as a `DataFrame`.
*
* This function will go through the input once to determine the input schema if `inferSchema`
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
* specify the schema explicitly using `schema`.
*
* You can find the XML-specific options for reading XML files in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
* Data Source Option</a> in the version you use.
*
* @since 4.0.0
*/
@scala.annotation.varargs
def xml(paths: String*): DataFrame = format("xml").load(paths: _*)

/**
* Loads an `Dataset[String]` storing XML object and returns the result as a `DataFrame`.
*
* If the schema is not specified using `schema` function and `inferSchema` option is enabled,
* this function goes through the input once to determine the input schema.
*
* @param xmlDataset
* input Dataset with one XML object per record
* @since 4.0.0
*/
def xml(xmlDataset: Dataset[String]): DataFrame =
parse(xmlDataset, ParseFormat.PARSE_FORMAT_UNSPECIFIED)

/**
* Loads a Parquet file, returning the result as a `DataFrame`. See the documentation on the
* other overloaded `parquet()` method for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
format("csv").save(path)
}

/**
* Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
* to:
* {{{
* format("xml").save(path)
* }}}
*
* Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
* `ArrayType` would have an additional nested field for the element. For example, the
* `DataFrame` having a field below,
*
* {@code fieldA [[data1], [data2]]}
*
* would produce a XML file below. { @code <fieldA> <item>data1</item> </fieldA> <fieldA>
* <item>data2</item> </fieldA>}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the fixed version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. Changed:
{@code fieldA [[data1, data2]]}
to
{@code fieldA [[data1], [data2]]}

*
* Namely, roundtrip in writing and reading can end up in different schema structure.
*
* You can find the XML-specific options for writing XML files in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
* Data Source Option</a> in the version you use.
*
* @since 4.0.0
*/
def xml(path: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format("xml").save(path)
}

///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7227,6 +7227,112 @@ object functions {
*/
def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())

// scalastyle:off line.size.limit
/**
* Parses a column containing a XML string into the data type corresponding to the specified
* schema. Returns `null`, in the case of an unparseable string.
*
* @param e
* a string column containing XML data.
* @param schema
* the schema to use when parsing the XML string
* @param options
* options to control how the XML is parsed. accepts the same options and the XML data source.
* See <a href=
* "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
* Source Option</a> in the version you use.
* @group collection_funcs
*
* @since 4.0.0
*/
// scalastyle:on line.size.limit
def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the schema parameter can be StructType or Column, the options parameter can be scala or java map, or omitted. This means we need 6 overloads of from_xml.

Does it really worth it? I know we did the same thing for from_json, but this is really convoluted.

How about something like

TextParsingFunction.newBuilder()
  .withSchema(...) // It has multiple overloads
  .withOptions(...) // It has multiple overloads
  .xml() // returns a Column

Anyway, it's unrelated to this PR. We can do it later. cc @HyukjinKwon

Copy link
Member

@HyukjinKwon HyukjinKwon Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove (Scala-specific) signature with Scala map for now. For the same reason, we don't have that scala specific version of to_csv, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_csv has just two. I can trim from_xml overloads too. Let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this signature with Scala map for now in a followup.

from_xml(e, lit(schema.toDDL), options.iterator)

// scalastyle:off line.size.limit

/**
* (Java-specific) Parses a column containing a XML string into the data type corresponding to
* the specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e
* a string column containing XML data.
* @param schema
* the schema to use when parsing the XML string
* @param options
* options to control how the XML is parsed. accepts the same options and the XML data source.
* See <a href=
* "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
* Source Option</a> in the version you use.
* @group collection_funcs
*
* @since 4.0.0
*/
// scalastyle:on line.size.limit
def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
from_xml(e, schema, options.asScala.iterator)

/**
* Parses a column containing a XML string into the data type corresponding to the specified
* schema. Returns `null`, in the case of an unparseable string.
*
* @param e
* a string column containing XML data.
* @param schema
* the schema to use when parsing the XML string
* @group collection_funcs
*
* @since 4.0.0
*/
def from_xml(e: Column, schema: StructType): Column =
from_xml(e, schema, Map.empty[String, String])

private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
fnWithOptions("from_xml", options, e, schema)
}

/**
* Parses a XML string and infers its schema in DDL format.
*
* @param xml
* a XML string.
* @group collection_funcs
* @since 4.0.0
*/
def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))

/**
* Parses a XML string and infers its schema in DDL format.
*
* @param xml
* a foldable string column containing a XML string.
* @group collection_funcs
* @since 4.0.0
*/
def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml)

// scalastyle:off line.size.limit

/**
* Parses a XML string and infers its schema in DDL format using options.
*
* @param xml
* a foldable string column containing XML data.
* @param options
* options to control how the xml is parsed. accepts the same options and the XML data source.
* See <a href=
* "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
* Source Option</a> in the version you use.
* @return
* a column with string literal containing schema in DDL format.
* @group collection_funcs
* @since 4.0.0
*/
// scalastyle:on line.size.limit
def schema_of_xml(xml: Column, options: java.util.Map[String, String]): Column = {
fnWithOptions("schema_of_xml", options.asScala.iterator, xml)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no to_xml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. I have filed a sub-task: https://issues.apache.org/jira/browse/SPARK-44790

/**
* Returns the total number of elements in the array. The function returns null for null input.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
*/
def csv(path: String): DataFrame = format("csv").load(path)

/**
* Loads a XML file stream and returns the result as a `DataFrame`.
*
* This function will go through the input once to determine the input schema if `inferSchema`
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
* specify the schema explicitly using `schema`.
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
*
* You can find the XML-specific options for reading XML file stream in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
* Data Source Option</a> in the version you use.
*
* @since 4.0.0
*/
def xml(path: String): DataFrame = format("xml").load(path)

/**
* Loads a ORC file stream, returning the result as a `DataFrame`.
*
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-error-conditions-datatype-mismatch-error-class.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ The `<functionName>` does not support ordering on type `<dataType>`.

`<errors>`

## INVALID_XML_MAP_KEY_TYPE

Input schema `<schema>` can only contain STRING as a key type for a MAP.

## IN_SUBQUERY_DATA_TYPE_MISMATCH

The data type of one or more elements in the left hand side of an IN subquery is not compatible with the data type of the output of the subquery. Mismatched columns: [`<mismatchedColumns>`], left side: [`<leftType>`], right side: [`<rightType>`].
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ def test_function_parity(self):
missing_in_py = jvm_fn_set.difference(py_fn_set)

# Functions that we expect to be missing in python until they are added to pyspark
expected_missing_in_py = set()
expected_missing_in_py = {
# TODO: XML functions will soon be added and removed from this list
# https://issues.apache.org/jira/browse/SPARK-44788
"from_xml",
"schema_of_xml",
}

self.assertEqual(
expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected"
Expand Down
4 changes: 4 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
<artifactId>univocity-parsers</artifactId>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.ws.xmlschema</groupId>
<artifactId>xmlschema-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,27 @@ object ExprUtils extends QueryErrorsBase {
TypeCheckSuccess
}
}

/**
* Check if the schema is valid for XML
*
* @param schema The schema to check.
* @return
* `TypeCheckSuccess` if the schema is valid
* `DataTypeMismatch` with an error error if the schema is not valid
*/
def checkXmlSchema(schema: DataType): TypeCheckResult = {
val isInvalid = schema.existsRecursively {
// XML field names must be StringType
case MapType(keyType, _, _) if keyType != StringType => true
case _ => false
}
if (isInvalid) {
DataTypeMismatch(
errorSubClass = "INVALID_XML_MAP_KEY_TYPE",
messageParameters = Map("schema" -> toSQLType(schema)))
} else {
TypeCheckSuccess
}
}
}
Loading