sameer A
sameer A

Reputation: 13

Getting Multi Generator issue while flattening the XML file

I am getting Multi Generator issue while flattening the XML file using PySpark(Python).

XML is having 2 Arrays at same level. Please share if you have any other solution to flatten this XML.

Desired output is like: Output

Error: AnalysisException: [UNSUPPORTED_GENERATOR.MULTI_GENERATOR] The generator is not supported: only one generator allowed per SELECT clause but found 2: "generatorouter(explode(Child1.Child2.Child21))", "generatorouter(explode(Child1.Child3.Child31))".

XML:

<Body>
<Child0>5678</Child0>
<Child1>
    <Child2 name="HRA">
        <Child21>test 1</Child21>
        <Child21>test 2</Child21>
    </Child2>
    <Child3 name="LRA">
        <Child31>test 3</Child31>
        <Child31>test 4</Child31>
    </Child3>
</Child1>

Source Code:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType
from pyspark.sql.functions import explode_outer

def flatten(df):
    f_df = df
    select_expr = _explodeArrays(element=f_df.schema)
    # While there is at least one Array, explode.
    while "ArrayType(" in f"{f_df.schema}": 
        f_df=f_df.selectExpr(select_expr)
        select_expr = _explodeArrays(element=f_df.schema)

    # Flatten the structure
    select_expr = flattenExpr(f_df.schema)
    f_df = f_df.selectExpr(select_expr)
    return f_df    

def _explodeArrays(element, root=None):
    el_type = type(element)
    expr = []
    try:
        _path = f"{root+'.' if root else ''}{element.name}"
    except AttributeError:
        _path = ""

    if el_type == StructType:
        for t in element:
            res = _explodeArrays(t, root)
            expr.extend(res)
    elif el_type == StructField and type(element.dataType) == ArrayType:
        expr.append(f"explode_outer({_path}) as {_path.replace('.','_')}")
    elif el_type == StructField and type(element.dataType) == StructType:
        expr.extend(_explodeArrays(element.dataType, _path))
    else:   
        expr.append(f"{_path}  as {_path.replace('.','_')}")

    return expr

def flattenExpr(element, root=None):
    expr = []
    el_type = type(element)
    try:
        _path = f"{root+'.' if root else ''}{element.name}"
    except AttributeError:
        _path = ""
    if el_type == StructType:
        for t in element:
            expr.extend(flattenExpr(t, root))
    elif el_type == StructField and type(element.dataType) == StructType:
        expr.extend(flattenExpr(element.dataType, _path))
    elif el_type == StructField and type(element.dataType) == ArrayType:
        # You should use flattenArrays to be sure this will not happen
        expr.extend(flattenExpr(element.dataType.elementType, f"{_path}[0]"))
    else:
        expr.append(f"{_path} as {_path.replace('.','_')}")
    return expr


spark = SparkSession.builder.getOrCreate()

path = 'Files/Test9.xml'
df = spark.read.format('xml').options(rowTag='Body', ignoreNamespace='true').load(path)

display('******* Initial Data Frame of XML file ********')
display(df)

display('******* Initial Schema of XML file ********')
df.printSchema()

f_df = flatten(df)

display('******* Flatten Schema of XML file ********')
f_df.printSchema()

display('******* Flatten  Data Frame of XML file ********')
display(f_df)

Upvotes: 0

Views: 64

Answers (1)

Hermann12
Hermann12

Reputation: 3476

Gives not such a table to the right as you ask for, but as an idea. You can use iterparse() to get the xpath for the content:

import xml.etree.ElementTree as ET
import pandas as pd

def path(file_path):
    tu = []
    xpath = []
    for event, elem in ET.iterparse(file_path, events=("start","end",)):
        if event == "start":
            xpath.append(elem.tag)
            if elem.get("name") is not None:
                e = ("/".join(xpath), elem.get("name"))
                tu.append(e)
        if event == "end":
            if elem.text is not None:
                t =("/".join(xpath), elem.text)
                if "\n " in t[1]:
                    pass
                else:
                    e = (t[0], e[1], elem.text)
                    tu.append(e)
                xpath.pop()
    return tu
            
tu = path("body.xml")

df = pd.DataFrame(tu, columns=["xpath", "name","text"])
print(df.to_string())

Output:

                        xpath name    text
0          Body/Child1/Child2  ERA    None
1  Body/Child1/Child2/Child21  ERA  test 1
2  Body/Child1/Child2/Child21  ERA  test 2
3          Body/Child1/Child3  LRA    None
4  Body/Child1/Child3/Child31  LRA  test 3
5  Body/Child1/Child3/Child31  LRA  test 4

Upvotes: 0

Related Questions