Ujjal
Ujjal

Reputation: 11

Load XML file to dataframe in PySpark using 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())
  
df = spark.read.format('delta').table("tablename")
payloadSchema = ext_schema_of_xml_df(df.select("Columnname"))

On doing so getting the below error:-

java.lang.NoClassDefFoundError: Could not initialize class com.databricks.spark.xml.util.PermissiveMode$

Kindly note:- Maven Coordinate Used is com.databricks:spark-xml_2.12:0.14.0.

Expecting an compatible version of com.databricks:spark-xml with Data bricks Runtime 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

Upvotes: 0

Views: 829

Answers (1)

Vamsi Bitra
Vamsi Bitra

Reputation: 2764

As per above code, I reproduced same thing in my environment and reframed ,I got below results:

from pyspark.sql import SparkSession
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string

def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())


# Create sample data as DataFrame
df = spark.createDataFrame([
    (1, '<book><title>nam1232</title><author>vambl123</author></book>'),
    (2, '<book><title>dem123</title><author>sam13</author></book>'),
    (3, '<book><title>sj123</title><author>nam12</author></book>')
], ["id", "xml_data4"])

# Write df to Delta table
df.write.format("delta").mode("overwrite").save("/delta/sam")
# Read df from Delta table
df = spark.read.format("delta").load("/delta/sam")

# Select the required column and call the function to extract the schema of the XML data
payloadSchema = ext_schema_of_xml_df(df.select("xml_data4"))

If want to save delta table in particular file name. Use saveAsTable("Table _name").

Write data into particular table:

df.write.format("delta").mode("overwrite").saveAsTable("Table _name")

enter image description here

For information refer this similar SO thread.

Upvotes: 0

Related Questions