pythonsparkscala
pythonsparkscala

Reputation: 45

Parse the XML column into multiple columns and transpose into rows based on count in Spark DataFrame

I have a scenario where the XML column response_output have ordercount and orders with corresponding order details.

For example xml is like below, the count of OrderCount is 4 , and under orders we have 4 order details

<USR_ORD><OrderResponse><OrderResult>
<OrderCount>4</OrderCount>
<ORDTime>2021-02-02 21:13:12</ORDTime><ORDStatus>COMPLETE</ORDStatus>
<ORDValue>
<USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc>
<orders>
<order><name>MR RITA SOMA</name><address>606 JAL TXS</address><tracknumber>7825225</tracknumber><status>UNK</status></order>
<order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825226</tracknumber><status>FAIL</status></order>
<order><name>MR RODREX SOMA</name><address>18, GHC,BAN</address><tracknumber>7825224</tracknumber><status>SUC</status></order>
<order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825223</tracknumber><status>SUC</status></order>
</orders>
<USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD>
</ORDValue>
</OrderResult></OrderResponse></USR_ORD>

I need to retrieve the records based on the ordercount, if the ordercount is 4 then I need to iterate 4 times on orders and fetch 4 records with all order details and if the ordercount is 1 then I need to fetch 1 record with order details respectively.

Could anyone help me with this with spark2, scala solution?

SourceData:


|customer_id|response_id|response_output|

|100        |1          |<USR_ORD><OrderResponse><OrderResult><OrderCount>1</OrderCount><ORDTime>2021-02-02 10:34:19</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>321</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><orders><order><name>MRS MITA PERS</name><address>17 MAXI RD CHN</address><tracknumber>7825222</tracknumber><status>FAIL</status><amount>4500</amount><orderdate>2019-10-18</orderdate></order></orders><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse|
|200        |1          |<USR_ORD><OrderResponse><OrderResult><OrderCount>4</OrderCount><ORDTime>2021-02-02 21:13:12</ORDTime><ORDStatus>COMPLETE</ORDStatus><ORDValue><USR1OrderTotalTime>221</USR1OrderTotalTime><USR1OrderKYC>{ND}</USR1OrderKYC><USR1OrderLoc>{ND}</USR1OrderLoc><orders><order><name>MR RITA SOMA</name><address>606 JAL TXS</address><tracknumber>7825225</tracknumber><status>UNK</status><amount>1030</amount><orderdate>2020-11-16</orderdate></order><order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825226</tracknumber><status>FAIL</status><amount>8000</amount><orderdate>2018-07-17</orderdate></order><order><name>MR RODREX SOMA</name><address>18, GHC, BAN</address><tracknumber>7825224</tracknumber><status>SUC</status><amount>2500</amount><orderdate>2017-09-16</orderdate></order><order><name>MR RITA SOMA</name><address>1 BAL, HAL</address><tracknumber>7825223</tracknumber><status>SUC</status><amount>2700</amount><orderdate>2017-04-22</orderdate></order></orders><USR1Orderqnt>10</USR1Orderqnt><USR1Orderxyz>0</USR1Orderxyz><USR1OrderD>{ND}</USR1OrderD></ORDValue></OrderResult></OrderResponse></USR_ORD>|


When I try to run the below sql I am getting as follows but I need to get 4 records for customer_id 200 as the count is 4 with the corresponding oder details.

 spark.sql("""select
     |     customer_id,
     |     xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/OrderCount') as OrderCount,
     |     xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/name') as name,
     | xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/address') as address,
     | xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/tracknumber') as tracknumber,
     | xpath_string(response_output,'USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/status') as status
     |          from cust_tbl""").show()
     

Result I am Getting:

+-----------+----------+-------------+--------------+-----------+------+
|customer_id|OrderCount|         name|       address|tracknumber|status|
+-----------+----------+-------------+--------------+-----------+------+
|        100|         1|MRS MITA PERS|17 MAXI RD CHN|    7825222|  FAIL|
|        200|         4| MR RITA SOMA|   606 JAL TXS|    7825225|   UNK|
+-----------+----------+-------------+--------------+-----------+------+

Expecting OutPut:

+-----------+----------+------------+-----------+-----------+------+
|customer_id|OrderCount|name        |address    |tracknumber|status|
+-----------+----------+------------+-----------+-----------+------+
|200        |4         |MRRITASOMA  |606JALTXS  |7825225    |UNK   |
|200        |4         |MRRITASOMA  |1BAL HAL   |7825226    |FAIL  |
|200        |4         |MRRODREXSOMA|18 GHC BAN |7825224    |SUC   |
|200        |4         |MRRITASOMA  |1 BAL HAL  |7825223    |SUC   |
|100        |1         |MRSMITAPERS |17MAXIRDCHN|7825222    |FAIL  |
+-----------+----------+------------+-----------+-----------+------+

Upvotes: 0

Views: 1279

Answers (1)

blackbishop
blackbishop

Reputation: 32680

The function xpath_string extracts one string value for the given XPath expression. For your case, you need to use xpath to get array of the node values for each order detail (name, status, ...) and zip them all together using arrays_zip:

val df1 = df.withColumn(
    "OrderCount",
    expr("xpath_string(response_output, 'USR_ORD/OrderResponse/OrderResult/OrderCount')")
).withColumn(
    "orders",
    explode(
        arrays_zip(
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/name/text()')"),
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/address/text()')"),
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/tracknumber/text()')"),
            expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/status/text()')")
        ).cast("array<struct<name:string,address:string,tracknumber:string,status:string>>")
    )
).select("customer_id", "OrderCount", "orders.*")

df1.show(false)

//+-----------+----------+--------------+--------------+-----------+------+
//|customer_id|OrderCount|name          |address       |tracknumber|status|
//+-----------+----------+--------------+--------------+-----------+------+
//|100        |1         |MRS MITA PERS |17 MAXI RD CHN|7825222    |FAIL  |
//|200        |4         |MR RITA SOMA  |606 JAL TXS   |7825225    |UNK   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825226    |FAIL  |
//|200        |4         |MR RODREX SOMA|18, GHC, BAN  |7825224    |SUC   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825223    |SUC   |
//+-----------+----------+--------------+--------------+-----------+------+

Update

For Spark < 2.4, you can posexplode each array columns and join on index :

val df1 = df.withColumn(
    "OrderCount",
    expr("xpath_string(response_output, 'USR_ORD/OrderResponse/OrderResult/OrderCount')")
  ).select(  
    col("customer_id"),
    col("OrderCount"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/name/text()')").as("name"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/address/text()')").as("address"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/tracknumber/text()')").as("tracknumber"),
    expr("xpath(response_output, '/USR_ORD/OrderResponse/OrderResult/ORDValue/orders/order/status/text()')").as("status")
  )

val result = df1.selectExpr("customer_id", "OrderCount", "posexplode(name) as (idx, name)")
  .join(
    df1.selectExpr("customer_id", "posexplode(address) as (idx, address)"),
    Seq("idx", "customer_id")
  ).join(
    df1.selectExpr("customer_id","posexplode(tracknumber) as (idx, tracknumber)"),
    Seq("idx", "customer_id")
  ).join(
    df1.selectExpr("customer_id", "posexplode(status) as (idx, status)"),
    Seq("idx", "customer_id")
  ).drop("idx")

result.show(false)

//+-----------+----------+--------------+--------------+-----------+------+
//|customer_id|OrderCount|name          |address       |tracknumber|status|
//+-----------+----------+--------------+--------------+-----------+------+
//|100        |1         |MRS MITA PERS |17 MAXI RD CHN|7825222    |FAIL  |
//|200        |4         |MR RITA SOMA  |606 JAL TXS   |7825225    |UNK   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825226    |FAIL  |
//|200        |4         |MR RODREX SOMA|18, GHC, BAN  |7825224    |SUC   |
//|200        |4         |MR RITA SOMA  |1 BAL, HAL    |7825223    |SUC   |
//+-----------+----------+--------------+--------------+-----------+------+

Upvotes: 1

Related Questions