sashmi
sashmi

Reputation: 149

Combine multiple columns into single column in SPARK

I have a flattened incoming data in the below format in my parquet file:

enter image description here

I want to convert it into the below format where I am non-flattening my structure:

enter image description here

I tried the following:

Dataset<Row> rows = df.select(col("id"), col("country_cd"),
                explode(array("fullname_1", "fullname_2")).as("fullname"),
                explode(array("firstname_1", "firstname_2")).as("firstname"));

But it gives the below error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Only one generator allowed per select clause but found 2: explode(array(fullname_1, fullname_2)), explode(array(firstname_1, firstname_2));

I understand it is because you cannot use more than 1 explode in a query. I am looking for options to do the above in Spark Java.

Upvotes: 2

Views: 6866

Answers (3)

nefo_x
nefo_x

Reputation: 3088

You need to wrap first and last names into an array of structs, which you later then explode:

Dataset<Row> rows = df.select(col("id"), col("country_cd"),
  explode(
    array(
      struct(
        col("firstname_1").as("firstname"), col("fullname_1").as("fullname")),
      struct(
        col("firstname_2").as("firstname"), col("fullname_2").as("fullname"))
    )
  )
)

This way you'll get fast narrow transformation, have Scala/Python/R portability and it should run quicker than the df.flatMap solution, which will turn Dataframe to an RDD, which Query Optimizer cannot improve. There might be additional pressure from Java Garbage Collector because of copying from unsafe byte arrays to java objects.

Upvotes: 3

wBob
wBob

Reputation: 14379

As a database person, I like to use set-based operations for things like this, eg union

val df = Seq(
  ("1", "USA", "Lee M", "Lee", "Dan A White", "Dan"),
  ("2", "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")


val df_new = df
  .select("id", "country_code", "fullname_1", "firstname_1").union(df.select("id", "country_code", "fullname_2", "firstname_2"))
  .orderBy("id")

df_new.show
df.createOrReplaceTempView("tmp")

Or the equivalent SQL:

%sql
SELECT id, country_code, fullname_1 AS fullname, firstname_1 AS firstname
FROM tmp
UNION
SELECT id, country_code, fullname_2, firstname_2
FROM tmp

My results:

My results I suppose one advantage over the flatMap technique is you don't have to specify the datatypes and it appears simpler on the face of it. It's up to you of course.

Upvotes: 1

Travis Hegner
Travis Hegner

Reputation: 2495

This type of problem is most easily solved with a .flatMap(). A .flatMap() is like a .map() except that it allows you to output n records for each input record, as opposed to a 1:1 ratio.

val df = Seq(
    (1, "USA", "Lee M", "Lee", "Dan A White", "Dan"),
    (2, "CAN", "Pate Poland", "Pate", "Don Derheim", "Don")
    ).toDF("id", "country_code", "fullname_1", "firstname_1", "fullname_2", "firstname_2")

df.flatMap(row => {
    val id = row.getAs[Int]("id")
    val cc = row.getAs[String]("country_code")
    Seq(
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1")),
        (id, cc, row.getAs[String]("fullname_1"), row.getAs[String]("firstname_1"))
    )
}).toDF("id", "country_code", "fullname", "firstname").show()

This results in the following:

+---+------------+-----------+---------+
| id|country_code|   fullname|firstname|
+---+------------+-----------+---------+
|  1|         USA|      Lee M|      Lee|
|  1|         USA|      Lee M|      Lee|
|  2|         CAN|Pate Poland|     Pate|
|  2|         CAN|Pate Poland|     Pate|
+---+------------+-----------+---------+

Upvotes: 5

Related Questions