SanjanaSanju
SanjanaSanju

Reputation: 297

Exploding multiple array columns in spark for a changing input schema

Below is my sample schema.

|-- provider: string (nullable = true)
 |-- product: string (nullable = true)
 |-- asset_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- creation_date: string (nullable = true)
 |-- provider_id: string (nullable = true)
 |-- asset: string (nullable = true)
 |-- asset_clas: string (nullable = true)
 |-- Actors: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Actors_Display: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Audio_Type: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Billing_ID: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Bit_Rate: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- CA_Rating: array (nullable = true)
 |    |-- element: string (containsNull = false)

I need to explode all the array type columns.I have around 80+ columns and the columns keeps changing. I am currently using explode(array_zip)

   val df= sourcedf.select($"provider",$"asset_name",$"description",$"creation_date",$"provider_id",$"asset_id",$"asset_class",$"product",$"provider_id",$"eligible_platform",$"actors",$"category",
explode_outer(arrays_zip($"Actors_Display",$"Audio_Type",$"Billing_ID",$"Bit_Rate",$"CA_Rating")


val parsed_output = df.select(col("provider"),("asset_name"),col("description"),col("creation_date"),col("product"),col("provider"),
    col("povider_id"),col("asset_id"),col("asset_class"),
  col("col.Actors_Display"),col("col.Audio_Type"),col("col.Billing_ID"),col("col.Bit_Rate"),col("col.CA_Rating"))

By using, the above I am able to get the output. But this is working for only one particular file. In my case there will be new columns being added frequently. So, is there any function that could do the explode of multiple columns for changing schema and also select the non array columns from the file. Could someone please give an example

Note: Only the array columns keeps changing, the rest will be constant.

Below is the sample data

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ADIL>
  <Meta>
    <AMS Asset_Name="asd" Provider="Level" Product="MOTD" Version_Major="1" Version_Minor="0" Description="ZXC" Creation_Date="2009-12-03" Provider_ID="qwer.com" Asset_ID="A12we" Asset_Class="package"/>
    <App_Data App="MOD" Name="Actors" Value="CableLa1.1"/>
    <App_Data App="MOD" Name="Actors_Display" Value="RTY"/>
    <App_Data App="MOD" Name="Audio_Type" Value="FGH"/>
  </Meta>
  <Asset>
    <Meta>
      <AMS Asset_Name="bnm" Provider="Level Film" Product="MOTD" Version_Major="1" Version_Minor="0" Description="bnj7" Creation_Date="2009-12-03" Provider_ID="levelfilm.com" Asset_ID="DDDB0610072533182333" Asset_Class="title"/>
      App_Data App="rt" Name="Billing_ID" Value="2020-12-29T00:00:00"/>
      <App_Data App="MOD" Name="Bit_Rate" Value="2021-12-29T23:59:59"/>
      <App_Data App="MOD" Name="CA_Rating" Value="16.99"/>
      </Meta>
    <Asset>
      <Meta>
       <AMS Asset_Name="atysd" Provider="Level1" Product="MOTD2" Version_Major="1" Version_Minor="0" Description="ZXCY" Creation_Date="2009-12-03" Provider_ID="qweDFtrr.com" Asset_ID="A12FGwe" Asset_Class="review"/>
     

This is the xml data. Initially, this data is parsed and all the name attribute values are converted into column names and all the "value" attribute values are converted to the value of the column names. This XML is having repeating tags,so the final result after parsing results in array columns and I used collect_list at the end of the parsing logic.

This is the sample output after parsing.

+-------------------+-------------------+-----------------+------------+--------------+
|Actors               |Actors_Display    |Audio_Type       |Billing_ID  |Bit_rate 
+-------------+---------------+-----------------------------------------+------------
|["movie","cinema",] | ["Dolby 5.1"]     | ["High", "low"] | ["GAR15"]|  ["15","14"]         
+-------------+-----+-------------------+-----------------+--------------+----------

Upvotes: 0

Views: 506

Answers (1)

Test Mirror
Test Mirror

Reputation: 344

Assuming you want to explode all ArrayType columns (otherwise, filter accordingly):

val df = Seq(
  (1, "xx", Seq(10, 20), Seq("a", "b"), Seq("p", "q")),
  (2, "yy", Seq(30, 40), Seq("c", "d"), Seq("r", "s"))
).toDF("c1", "c2", "a1", "a2", "a3")

import org.apache.spark.sql.types.{StructField, ArrayType}

val arrCols = df.schema.fields
  .collect{case StructField(name, _: ArrayType, _, _) => name}
  .map(col)

val otherCols = df.columns.map(col) diff arrCols

df.withColumn("arr_zip", explode_outer(arrays_zip(arrCols: _*)))
  .select(otherCols.toList ::: $"arr_zip.*" :: Nil: _*)
  .show
+---+---+---+---+---+
| c1| c2| a1| a2| a3|
+---+---+---+---+---+
|  1| xx| 10|  a|  p|
|  1| xx| 20|  b|  q|
|  2| yy| 30|  c|  r|
|  2| yy| 40|  d|  s|
+---+---+---+---+---+

Upvotes: 1

Related Questions