Sparker0i
Sparker0i

Reputation: 1861

Spark unable to Explode column

For the given JSON Response:

{
    "id": "1575972348068_1649088229",
    "results": [
        {
            "rows_count": 53,
            "runtime_seconds": 0.004000000189989805,
            "columns": [
                "ROLE_ID",
                "ROLE_NAME"
            ],
            "columns_type": [
                "number",
                "string"
            ],
            "limit": 2000000000,
            "index": 0,
            "rows": [
                [
                    "6",
                    "Incentive Plan Advisor                                                                              "
                ],
                [
                    "7",
                    "Security Admin                                                                                      "
                ]
            ],
            "command": "<an sql command>"
        }
    ],
    "status": "completed"
}

I want to get the rows in this JSON as a Spark Dataframe. For this, I am trying to explode the results entry using:

response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
        flattened.show()

I get this response:

+--------------------+--------------------+---------+
|                  id|             results|   status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+

+--------------------+
|       results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+

While trying to do an explode more, I get this error:

flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
   +- Project [results#75, results_flat1#91]
      +- Generate explode(results#75), false, [results_flat1#91]
         +- LogicalRDD [id#74, results#75, status#76], false

From what I can analyse, I can see that for explode, we need to have a string or an array of string for explode to work. For that, I tried:

val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()

Trying this, gives another error:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
    at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
    at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
    at com.ibm.cmdwcloud.Main.main(Main.scala)

I am not aware of any methods that can help me get the rows object directly and convert it to a DataFrame. Please help on this.

EDIT:

I'm able to see this schema though:

root
 |-- results_flat1: struct (nullable = true)
 |    |-- columns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- columns_type: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- command: string (nullable = true)
 |    |-- index: long (nullable = true)
 |    |-- limit: long (nullable = true)
 |    |-- rows: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- rows_count: long (nullable = true)
 |    |-- runtime_seconds: double (nullable = true)

But unable to explode on this..

EDIT 2:

Thanks to the response below, I've gotten closer to what I'm trying to achieve. I performed this:

val flattened = response.select($"results", explode($"results").as("results_flat1"))
            .select("results_flat1.*")
            .select($"rows", explode($"rows").as("rows_flat"))
            .select($"rows_flat")

flattened.show()

And got this output:

+--------------------+
|           rows_flat|
+--------------------+
|[6, Incentive Pla...|
|[7, Security Admi...|
+--------------------+

Is it now possible to explode this and map it to a schema further so that I can get something like:

+--------------------+--------------------+
|             role_id|           role_name|
+--------------------+--------------------+
|                   6| Incentive Plan Ad..|
|                   7|      Security Admin|
+--------------------+--------------------+

Upvotes: 0

Views: 2114

Answers (1)

baitmbarek
baitmbarek

Reputation: 2518

You don't have to explode your structure twice.

Is this suitable ?

val flattened = response.select(explode($"results").as("results_flat1"))
        .select("results_flat1.*")
    flattened.show(false)

+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|columns             |columns_type    |command         |index|limit     |rows                                                                      |rows_count|runtime_seconds     |
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|[ROLE_ID, ROLE_NAME]|[number, string]|<an sql command>|0    |2000000000|[WrappedArray(6, Incentive Plan Advisor), WrappedArray(7, Security Admin)]|53        |0.004000000189989805|
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+

Edit 2: Here's a solution for your (edit 2)

val flattened = response.select($"results", explode($"results").as("results_flat1"))
      .select("results_flat1.*")
      .select(explode($"rows").as("rows"))
      .select($"rows".getItem(0).as("idx"),$"rows".getItem(1).as("label"))

Output :

+---+--------------------+
|idx|               label|
+---+--------------------+
|  6|Incentive Plan Ad...|
|  7|      Security Admin|
+---+--------------------+

Upvotes: 2

Related Questions