user3780814
user3780814

Reputation: 147

Spark explode nested JSON with Array in Scala

Lets say i loaded a json file into Spark 1.6 via

sqlContext.read.json("/hdfs/")

it gives me a Dataframe with following schema:

root
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checked: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- color: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)

The DF has only one row with an Array of all my Items inside.

+--------------------+--------------------+--------------------+
|                id_e|           checked_e|             color_e|
+--------------------+--------------------+--------------------+
|[0218797c-77a6-45...|[false, true, tru...|[null, null, null...|
+--------------------+--------------------+--------------------+

I want to have a DF with the arrays exploded into one item per line.

+--------------------+-----+-------+
|                  id|color|checked|
+--------------------+-----+-------+
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
...

So far i achieved this by creating a temporary table from the array DF and used sql with lateral view explode on these lines.

val results = sqlContext.sql("
SELECT id, color, checked from temptable 
lateral view explode(checked_e) temptable as checked 
lateral view explode(id_e) temptable as id 
lateral view explode(color_e) temptable as color
")

Is there any way to achieve this directly with Dataframe functions without using SQL? I know there is something like df.explode(...) but i could not get it to work with my Data

EDIT: It seems the explode isnt what i really wanted in the first place. I want a new dataframe that has each item of the arrays line by line. The explode function actually gives back way more lines than my initial dataset has.

Upvotes: 5

Views: 19733

Answers (2)

Rockie Yang
Rockie Yang

Reputation: 4925

The following solution should work.

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val df = sqlContext.createDataFrame(data)

val udf3 = udf[Seq[(Int, Int, Int)], Seq[Int], Seq[Int], Seq[Int]]{
    case (a, b, c) => (a,b, c).zipped.toSeq
}

val df3 = df.select(udf3($"_1", $"_2", $"_3").alias("udf3"))
val exploded = df3.select(explode($"udf3").alias("col3"))

exploded.withColumn("first", $"col3".getItem("_1"))
    .withColumn("second", $"col3".getItem("_2"))
    .withColumn("third", $"col3".getItem("_3")).show

While it is more straightforward if using normal Scala code directly. It might be more efficient too. Spark could not help anyway if there is only one row.

val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val seqExploded = data.flatMap{
    case (a: Seq[Int], b: Seq[Int], c: Seq[Int]) => (a, b, c).zipped.toSeq
}
val dfTheSame=sqlContext.createDataFrame(seqExploded)
dfTheSame.show

Upvotes: 5

Vitalii Kotliarenko
Vitalii Kotliarenko

Reputation: 2967

It should be simple like this:

df.withColumn("id", explode(col("id_e")))
  .withColumn("checked", explode(col("checked_e")))
  .withColumn("color", explode(col("color_e")))

Upvotes: 1

Related Questions