Reputation: 149
I have a flattened incoming data in the below format in my parquet file:
I want to convert it into the below format where I am non-flattening my structure:
I tried the following:
Dataset<Row> rows = df.select(col("id"), col("country_cd"),
explode(array("fullname_1", "fullname_2")).as("fullname"),
explode(array("firstname_1", "firstname_2")).as("firstname"));
But it gives the below error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Only one generator allowed per select clause but found 2: explode(array(fullname_1, fullname_2)), explode(array(firstname_1, firstname_2));
I understand it is because you cannot use more than 1 explode in a query. I am looking for options to do the above in Spark Java.
Upvotes: 2
Views: 6866
Reputation: 3088
You need to wrap first and last names into an array of structs, which you later then explode:
Dataset<Row> rows = df.select(col("id"), col("country_cd"),
explode(
array(
struct(
col("firstname_1").as("firstname"), col("fullname_1").as("fullname")),
struct(
col("firstname_2").as("firstname"), col("fullname_2").as("fullname"))
)
)
)
This way you'll get fast narrow transformation, have Scala/Python/R portability and it should run quicker than the df.flatMap
solution, which will turn Dataframe to an RDD, which Query Optimizer cannot improve. There might be additional pressure from Java Garbage Collector because of copying from unsafe byte arrays to java objects.
Upvotes: 3
Reputation: 14379
As a database person, I like to use set-based operations for things like this, eg union
val df = Seq(
("1", "USA", "Lee M", "Lee", "Dan A White", "Dan"),
("2", "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")
val df_new = df
.select("id", "country_code", "fullname_1", "firstname_1").union(df.select("id", "country_code", "fullname_2", "firstname_2"))
.orderBy("id")
df_new.show
df.createOrReplaceTempView("tmp")
Or the equivalent SQL:
%sql
SELECT id, country_code, fullname_1 AS fullname, firstname_1 AS firstname
FROM tmp
UNION
SELECT id, country_code, fullname_2, firstname_2
FROM tmp
My results:
I suppose one advantage over the flatMap technique is you don't have to specify the datatypes and it appears simpler on the face of it. It's up to you of course.
Upvotes: 1
Reputation: 2495
This type of problem is most easily solved with a .flatMap()
. A .flatMap()
is like a .map()
except that it allows you to output n records for each input record, as opposed to a 1:1 ratio.
val df = Seq(
(1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
(2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")
df.flatMap(row => {
val id = row.getAs[Int]("id")
val cc = row.getAs[String]("country_code")
Seq(
(id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
(id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
)
}).toDF("id", "country_code", "fullname", "firstname").show()
This results in the following:
+---+------------+-----------+---------+
| id|country_code| fullname|firstname|
+---+------------+-----------+---------+
| 1| USA| Lee M| Lee|
| 1| USA| Lee M| Lee|
| 2| CAN|Pate Poland| Pate|
| 2| CAN|Pate Poland| Pate|
+---+------------+-----------+---------+
Upvotes: 5