Reputation: 1861
For the given JSON Response:
{
"id": "1575972348068_1649088229",
"results": [
{
"rows_count": 53,
"runtime_seconds": 0.004000000189989805,
"columns": [
"ROLE_ID",
"ROLE_NAME"
],
"columns_type": [
"number",
"string"
],
"limit": 2000000000,
"index": 0,
"rows": [
[
"6",
"Incentive Plan Advisor "
],
[
"7",
"Security Admin "
]
],
"command": "<an sql command>"
}
],
"status": "completed"
}
I want to get the rows
in this JSON as a Spark Dataframe. For this, I am trying to explode
the results
entry using:
response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
flattened.show()
I get this response:
+--------------------+--------------------+---------+
| id| results| status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+
+--------------------+
| results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+
While trying to do an explode
more, I get this error:
flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
+- Project [results#75, results_flat1#91]
+- Generate explode(results#75), false, [results_flat1#91]
+- LogicalRDD [id#74, results#75, status#76], false
From what I can analyse, I can see that for explode, we need to have a string or an array of string for explode to work. For that, I tried:
val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()
Trying this, gives another error:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:255)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
at com.ibm.cmdwcloud.Main.main(Main.scala)
I am not aware of any methods that can help me get the rows object directly and convert it to a DataFrame. Please help on this.
EDIT:
I'm able to see this schema though:
root
|-- results_flat1: struct (nullable = true)
| |-- columns: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- columns_type: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- command: string (nullable = true)
| |-- index: long (nullable = true)
| |-- limit: long (nullable = true)
| |-- rows: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: string (containsNull = true)
| |-- rows_count: long (nullable = true)
| |-- runtime_seconds: double (nullable = true)
But unable to explode on this..
EDIT 2:
Thanks to the response below, I've gotten closer to what I'm trying to achieve. I performed this:
val flattened = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select($"rows", explode($"rows").as("rows_flat"))
.select($"rows_flat")
flattened.show()
And got this output:
+--------------------+
| rows_flat|
+--------------------+
|[6, Incentive Pla...|
|[7, Security Admi...|
+--------------------+
Is it now possible to explode this and map it to a schema further so that I can get something like:
+--------------------+--------------------+
| role_id| role_name|
+--------------------+--------------------+
| 6| Incentive Plan Ad..|
| 7| Security Admin|
+--------------------+--------------------+
Upvotes: 0
Views: 2114
Reputation: 2518
You don't have to explode your structure twice.
Is this suitable ?
val flattened = response.select(explode($"results").as("results_flat1"))
.select("results_flat1.*")
flattened.show(false)
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|columns |columns_type |command |index|limit |rows |rows_count|runtime_seconds |
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
|[ROLE_ID, ROLE_NAME]|[number, string]|<an sql command>|0 |2000000000|[WrappedArray(6, Incentive Plan Advisor), WrappedArray(7, Security Admin)]|53 |0.004000000189989805|
+--------------------+----------------+----------------+-----+----------+--------------------------------------------------------------------------+----------+--------------------+
Edit 2: Here's a solution for your (edit 2)
val flattened = response.select($"results", explode($"results").as("results_flat1"))
.select("results_flat1.*")
.select(explode($"rows").as("rows"))
.select($"rows".getItem(0).as("idx"),$"rows".getItem(1).as("label"))
Output :
+---+--------------------+
|idx| label|
+---+--------------------+
| 6|Incentive Plan Ad...|
| 7| Security Admin|
+---+--------------------+
Upvotes: 2