Ankur
Ankur

Reputation: 21

Spark: Rows to Columns (like transpose or pivot)

How to transpose rows to columns using RDD or data frame without pivot.

SessionId,date,orig, dest, legind, nbr

1   9/20/16,abc0,xyz0,o,1
1   9/20/16,abc1,xyz1,o,2
1   9/20/16,abc2,xyz2,i,3
1   9/20/16,abc3,xyz3,i,4

So I want to generate new schema like:

SessionId,date,orig1, orig2, orig3, orig4, dest1, dest2, dest3,dest4

1,9/20/16,abc0,abc1,null, null, xyz0,xyz1, null, null

Logic is if:

So how to transpose the rows to columns...

Any idea will be great appreciated.

Tried with below option but its just flatten all in single row..

val keys = List("SessionId");
val selectFirstValueOfNoneGroupedColumns =
  df.columns
    .filterNot(keys.toSet)
    .map(_ -> "first").toMap
val grouped =
  df.groupBy(keys.head, keys.tail: _*)
    .agg(selectFirstValueOfNoneGroupedColumns).show()

Upvotes: 1

Views: 2626

Answers (1)

zero323
zero323

Reputation: 330093

It is relatively simple if you use pivot function. First lets create a data set like the one in your question:

import org.apache.spark.sql.functions.{concat, first, lit, when}

val df = Seq(
  ("1", "9/20/16", "abc0", "xyz0", "o", "1"),
  ("1", "9/20/16", "abc1", "xyz1", "o", "2"),
  ("1", "9/20/16", "abc2", "xyz2", "i", "3"),
  ("1", "9/20/16", "abc3", "xyz3", "i", "4")
).toDF("SessionId", "date", "orig", "dest", "legind", "nbr")

then define and attach helper columns:

// This will be the column name
val key = when($"legind" === "o", concat(lit("orig"), $"nbr"))
           .when($"legind" === "i", concat(lit("dest"), $"nbr"))

// This will be the value
val value = when($"legind" === "o", $"orig")     // If o take origin
              .when($"legind" === "i", $"dest")  // If i take dest

val withKV =  df.withColumn("key", key).withColumn("value", value)

This will result in a DataFrame like this:

+---------+-------+----+----+------+---+-----+-----+
|SessionId|   date|orig|dest|legind|nbr|  key|value|
+---------+-------+----+----+------+---+-----+-----+
|        1|9/20/16|abc0|xyz0|     o|  1|orig1| abc0|
|        1|9/20/16|abc1|xyz1|     o|  2|orig2| abc1|
|        1|9/20/16|abc2|xyz2|     i|  3|dest3| xyz2|
|        1|9/20/16|abc3|xyz3|     i|  4|dest4| xyz3|
+---------+-------+----+----+------+---+-----+-----+

Next let's define a list of possible levels:

val levels = Seq("orig", "dest").flatMap(x => (1 to 4).map(y => s"$x$y"))

and finally pivot

val result = withKV
  .groupBy($"sessionId", $"date")
  .pivot("key", levels)
  .agg(first($"value", true)).show

And the result is:

+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+
|sessionId|   date|orig1|orig2|orig3|orig4|dest1|dest2|dest3|dest4|
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+
|        1|9/20/16| abc0| abc1| null| null| null| null| xyz2| xyz3|
+---------+-------+-----+-----+-----+-----+-----+-----+-----+-----+

Upvotes: 1

Related Questions