Reputation: 11
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
df = spark.read.format('delta').table("tablename")
payloadSchema = ext_schema_of_xml_df(df.select("Columnname"))
On doing so getting the below error:-
java.lang.NoClassDefFoundError: Could not initialize class com.databricks.spark.xml.util.PermissiveMode$
Kindly note:- Maven Coordinate Used is com.databricks:spark-xml_2.12:0.14.0.
Expecting an compatible version of com.databricks:spark-xml with Data bricks Runtime 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)
Upvotes: 0
Views: 829
Reputation: 2764
As per above code, I reproduced same thing in my environment and reframed ,I got below results:
from pyspark.sql import SparkSession
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
# Create sample data as DataFrame
df = spark.createDataFrame([
(1, '<book><title>nam1232</title><author>vambl123</author></book>'),
(2, '<book><title>dem123</title><author>sam13</author></book>'),
(3, '<book><title>sj123</title><author>nam12</author></book>')
], ["id", "xml_data4"])
# Write df to Delta table
df.write.format("delta").mode("overwrite").save("/delta/sam")
# Read df from Delta table
df = spark.read.format("delta").load("/delta/sam")
# Select the required column and call the function to extract the schema of the XML data
payloadSchema = ext_schema_of_xml_df(df.select("xml_data4"))
If want to save delta table in particular file name. Use saveAsTable("Table _name")
.
Write data into particular table:
df.write.format("delta").mode("overwrite").saveAsTable("Table _name")
For information refer this similar SO thread.
Upvotes: 0