Reputation: 3930
I'm trying to parse a wide, nested XML file into a DataFrame using the spark-xml library.
Here is an abbreviated schema definition (XSD):
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ItemExport">
<xs:complexType>
<xs:sequence>
<xs:element name="Item">
<xs:complexType>
<xs:sequence>
<xs:element name="ITEM_ID" type="xs:integer" />
<xs:element name="CONTEXT" type="xs:string" />
<xs:element name="TYPE" type="xs:string" />
...
<xs:element name="CLASSIFICATIONS">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="unbounded" name="CLASSIFICATION">
<xs:complexType>
<xs:sequence>
<xs:element name="CLASS_SCHEME" type="xs:string" />
<xs:element name="CLASS_LEVEL" type="xs:string" />
<xs:element name="CLASS_CODE" type="xs:string" />
<xs:element name="CLASS_CODE_NAME" type="xs:string" />
<xs:element name="EFFECTIVE_FROM" type="xs:dateTime" />
<xs:element name="EFFECTIVE_TO" type="xs:dateTime" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
The XML file containing the data would looks something like this:
<?xml version="1.0" encoding="utf-8"?>
<ItemExport>
<TIMEZONE>PT</TIMEZONE>
<Item>
<ITEM_ID>56</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
</Item>
...
<Item>
<ITEM_ID>763</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
<CLASSIFICATIONS>
<CLASSIFICATION>
<CLASS_SCHEME>AAU</CLASS_SCHEME>
<CLASS_LEVEL>1</CLASS_LEVEL>
<CLASS_CODE>14</CLASS_CODE>
<CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME>
<EFFECTIVE_FROM />
<EFFECTIVE_TO />
</CLASSIFICATION>
</CLASSIFICATIONS>
</Item>
<ItemExport>
Now, what's clear is that the RowTag
needs to be Item
, but I've encountered an issue regarding the XSD. The row schema is encapsulated within the document schema.
import com.databricks.spark.xml.util.XSDToSchema
import com.databricks.spark.xml._
import java.nio.file.Paths
import org.apache.spark.sql.functions._
val inputFile = "dbfs:/samples/ItemExport.xml"
val schema = XSDToSchema.read(Paths.get("/dbfs/samples/ItemExport.xsd"))
val df1 = spark.read.option("rowTag", "Item").xml(inputFile)
val df2 = spark.read.schema(schema).xml(inputFile)
I basically want to get the struct
under Item under the root element, not the entire document schema.
schema.printTreeString
root
|-- ItemExport: struct (nullable = false)
| |-- Item: struct (nullable = false)
| | |-- ITEM_ID: integer (nullable = false)
| | |-- CONTEXT: string (nullable = false)
| | |-- TYPE: string (nullable = false)
...(a few more fields...)
| | |-- CLASSIFICATIONS: struct (nullable = false)
| | | |-- CLASSIFICATION: array (nullable = false)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- CLASS_SCHEME: string (nullable = false)
| | | | | |-- CLASS_LEVEL: string (nullable = false)
| | | | | |-- CLASS_CODE: string (nullable = false)
| | | | | |-- CLASS_CODE_NAME: string (nullable = false)
| | | | | |-- EFFECTIVE_FROM: timestamp (nullable = false)
| | | | | |-- EFFECTIVE_TO: timestamp (nullable = false)
In the case above, parsing with the document schema yields an empty DataFrame:
df2.show()
+-----------+
| ItemExport|
+-----------+
+-----------+
while the inferred schema is basically correct, but it can only infer nested columns when they are present (which is not always the case):
df1.show()
+----------+--------------------+----------+---------------+
| ITEM_ID| CONTEXT| TYPE|CLASSIFICATIONS|
+----------+--------------------+----------+---------------+
| 56| Sample | Product| {null}|
| 57| Sample | Product| {null}|
| 59| Part | Component| {null}|
| 60| Part | Component| {null}|
| 61| Sample | Product| {null}|
| 62| Sample | Product| {null}|
| 63| Assembly | Product| {null}|
df1.printSchema
root
|-- ITEM_ID: long (nullable = true)
|-- CONTEXT: string (nullable = false)
|-- TYPE: string (nullable = true)
...
|-- CLASSIFICATIONS: struct (nullable = true)
| |-- CLASSIFICATION: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CLASS_CODE: long (nullable = true)
| | | |-- CLASS_CODE_NAME: string (nullable = true)
| | | |-- CLASS_LEVEL: long (nullable = true)
| | | |-- CLASS_SCHEME: string (nullable = true)
| | | |-- EFFECTIVE_FROM: string (nullable = true)
| | | |-- EFFECTIVE_TO: string (nullable = true)
As described here and in the XML library docs ("Path to an XSD file that is used to validate the XML for each row individually"), I can parse into a given row-level schema as such:
import org.apache.spark.sql.types._
val structschema = StructType(
Array(
StructField("ITEM_ID",IntegerType,false),
StructField("CONTEXT",StringType,false),
StructField("TYPE",StringType,false),
)
)
val df_struct = spark.read.schema(structschema).option("rowTag", "Item").xml(inputFile)
I'd like to obtain the schema for the nested columns from the XSD however. How to go about this given the schema
?
Version info: Scala 2.12
, Spark 3.1.1
, spark-xml 0.12.0
Upvotes: 2
Views: 1361
Reputation: 391
I'm glad you found my post a bit useful! :).
I'm not sure if this is what you are looking for, but I've noticed that in your case you could also let spark-xml infer the schema from the xml.
By using this xml as example
<?xml version="1.0" encoding="utf-8"?>
<ItemExport>
<TIMEZONE>PT</TIMEZONE>
<Item>
<ITEM_ID>56</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
</Item>
<Item>
<ITEM_ID>763</ITEM_ID>
<CONTEXT>Sample763</CONTEXT>
<TYPE>Product2</TYPE>
<CLASSIFICATIONS>
<CLASSIFICATION>
<CLASS_SCHEME>AAU</CLASS_SCHEME>
<CLASS_LEVEL>1</CLASS_LEVEL>
<CLASS_CODE>14</CLASS_CODE>
<CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME>
<EFFECTIVE_FROM/>
<EFFECTIVE_TO/>
</CLASSIFICATION>
<CLASSIFICATION>
<CLASS_SCHEME>AXU</CLASS_SCHEME>
<CLASS_LEVEL>2</CLASS_LEVEL>
<CLASS_CODE>16</CLASS_CODE>
<CLASS_CODE_NAME>BizProd</CLASS_CODE_NAME>
<EFFECTIVE_FROM/>
<EFFECTIVE_TO/>
</CLASSIFICATION>
</CLASSIFICATIONS>
</Item>
</ItemExport>
And this spark code snippet,
var df = spark.read
.option("mode", "FAILFAST")
.option("nullValue", "")
.option("rootTag", "ItemExport")
.option("rowTag", "Item")
.option("ignoreSurroundingSpaces","true")
// .schema(schema)
.xml("pathTo/testing.xml")
.selectExpr("*")
df.printSchema()
df.show()
I got the following schema:
|-- CLASSIFICATIONS: struct (nullable = true)
| |-- CLASSIFICATION: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CLASS_CODE: long (nullable = true)
| | | |-- CLASS_CODE_NAME: string (nullable = true)
| | | |-- CLASS_LEVEL: long (nullable = true)
| | | |-- CLASS_SCHEME: string (nullable = true)
| | | |-- EFFECTIVE_FROM: string (nullable = true)
| | | |-- EFFECTIVE_TO: string (nullable = true)
|-- CONTEXT: string (nullable = true)
|-- ITEM_ID: long (nullable = true)
|-- TYPE: string (nullable = true)
It also seems to work with the following XSD:
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ITEM_ID" type="xs:integer"/>
<xs:element name="CONTEXT" type="xs:string"/>
<xs:element name="TYPE" type="xs:string"/>
<xs:element minOccurs="0" name="CLASSIFICATIONS">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="unbounded" name="CLASSIFICATION">
<xs:complexType>
<xs:sequence>
<xs:element name="CLASS_SCHEME" type="xs:string"/>
<xs:element name="CLASS_LEVEL" type="xs:string"/>
<xs:element name="CLASS_CODE" type="xs:string"/>
<xs:element name="CLASS_CODE_NAME" type="xs:string"/>
<xs:element minOccurs="0" name="EFFECTIVE_FROM" type="xs:dateTime"/>
<xs:element minOccurs="0" name="EFFECTIVE_TO" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
And if you want to get the nested CLASSIFICATION rows as actual rows in your dataframe, it seems you can opt to use the explode_outer
function (not sure about the impact of performance/memory-usage on this)
So you could do something like the following:
// Starting transformation
import spark.implicits._
import org.apache.spark.sql.functions.explode_outer
var df = spark.read
.option("mode", "FAILFAST")
.option("nullValue", "")
.option("rootTag", "ItemExport")
.option("rowTag", "Item")
.option("ignoreSurroundingSpaces","true")
.schema(schema) // notice I'm using the XSD this time :)
.xml("pathTo/testing.xml")
.select($"ITEM_ID", $"CONTEXT", $"TYPE", explode_outer($"CLASSIFICATIONS.CLASSIFICATION"))
.select($"ITEM_ID", $"CONTEXT", $"TYPE",
$"col.CLASS_SCHEME", $"col.CLASS_LEVEL", $"col.CLASS_CODE", $"col.CLASS_CODE_NAME", $"col.EFFECTIVE_FROM", $"col.EFFECTIVE_TO")
df.printSchema()
df.show()
In this case my DataFrame shows the results below
+-------+---------+--------+------------+-----------+----------+---------------+--------------+------------+
|ITEM_ID| CONTEXT| TYPE|CLASS_SCHEME|CLASS_LEVEL|CLASS_CODE|CLASS_CODE_NAME|EFFECTIVE_FROM|EFFECTIVE_TO|
+-------+---------+--------+------------+-----------+----------+---------------+--------------+------------+
| 56| Sample| Product| null| null| null| null| null| null|
| 763|Sample763|Product2| AAU| 1| 14| BizDev| null| null|
| 763|Sample763|Product2| AXU| 2| 16| BizProd| null| null|
+-------+---------+--------+------------+-----------+----------+---------------+--------------+------------+
I hope this can somehow help in your use case.
I modified the XSD, the minOccurs="0" to make parameters optional, are only needed in the fields that seem to be missing according to the XML you provided as example, these are (CLASSIFICATIONS, EFFECTIVE_FROM, EFFECTIVE_TO)
Upvotes: 0
Reputation: 10382
Columns in XSD are required or not null & Some of the columns in XML file is null to match XSD & XML file content, change schema from nullable=false
to nullable=true
Try following code.
import com.databricks.spark.xml.util.XSDToSchema
import com.databricks.spark.xml._
import java.nio.file.Paths
import org.apache.spark.sql.functions._
val inputFile = "dbfs:/samples/ItemExport.xml"
Getting schema from XSD, Applying same schema to an empty dataframe to get required columns.
val schema = spark
.createDataFrame(
spark
.sparkContext
.emptyRDD[Row],
XSDToSchema
.read(Paths.get("/dbfs/samples/ItemExport.xsd"))
)
.select("ItemExport.Item.*")
.schema
val df2 = spark.read
.option("rootTag", "ItemExport")
.option("rowTag", "Item")
.schema(setNullable(schema, true)) // To match XSD & XML file content setting all columns are optional i.e nullable=true
.xml(inputFile)
Below function will change all columns optional
or nullable=true
def setNullable(schema: StructType, nullable:Boolean = false): StructType = {
def recurNullable(schema: StructType): Seq[StructField] =
schema.fields.map{
case StructField(name, dtype: StructType, _, meta) =>
StructField(name, StructType(recurNullable(dtype)), nullable, meta)
case StructField(name, dtype: ArrayType, _, meta) => dtype.elementType match {
case struct: StructType => StructField(name, ArrayType(StructType(recurNullable(struct)), true), nullable, meta)
case other => StructField(name, other, nullable, meta)
}
case StructField(name, dtype, _, meta) =>
StructField(name, dtype, nullable, meta)
}
StructType(recurNullable(schema))
}
Upvotes: 2