Tamilselvan
Tamilselvan

Reputation: 53

Spark 2.3 with Java8 transform a row to columns

I am new to Spark 2.4 with Java 8. I need help. Here is example of instances:

Source DataFrame

+--------------+
| key | Value  |
+--------------+
| A   | John   |
| B   | Nick   |
| A   | Mary   |
| B   | Kathy  |
| C   | Sabrina|
| B   | George |
+--------------+

Meta DataFrame

+-----+
| key |
+-----+
| A   |
| B   |
| C   |
| D   |
| E   |
| F   |
+-----+

I would like to transform it to the following: Column names from Meta Dataframe and Rows will be transformed based on Source Dataframe

+-----------------------------------------------+
| A    | B      | C       | D     | E    | F    |
+-----------------------------------------------+
| John | Nick   | Sabrina | null  | null | null |
| Mary | Kathy  | null    | null  | null | null |
| null | George | null    | null  | null | null |
+-----------------------------------------------+

Need to write a code Spark 2.3 with Java8. Appreciated your help.

Upvotes: 1

Views: 563

Answers (1)

Oli
Oli

Reputation: 10406

To make things clearer (and easily reproducible) let's define dataframes:

val df1 = Seq("A" -> "John", "B" -> "Nick", "A" -> "Mary", 
              "B" -> "Kathy", "C" -> "Sabrina", "B" -> "George")
          .toDF("key", "value")
val df2 = Seq("A", "B", "C", "D", "E", "F").toDF("key")

From what I see, you are trying to create one column by value in the key column of df2. These columns should contain all the values of the value column that are associated to the key naming the column. If we take an example, column A's first value should be the value of the first occurrence of A (if it exists, null otherwise): "John". Its second value should be the value of the second occurrence of A: "Mary". There is no third value so the third value of the column should be null.

I detailed it to show that we need a notion of rank of the values for each key (windowing function), and group by that notion of rank. It would go as follows:

import org.apache.spark.sql.expressions.Window
val df1_win = df1
    .withColumn("id", monotonically_increasing_id)
    .withColumn("rank", rank() over Window.partitionBy("key").orderBy("id"))
// the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
val keys = df2.collect.map(_.getAs[String](0)).sorted

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys) 
    .agg(first('value))
    .orderBy("rank")
    //.drop("rank") // I keep here it for clarity
    .show()
+----+----+------+-------+----+----+----+                                       
|rank|   A|     B|      C|   D|   E|   F|
+----+----+------+-------+----+----+----+
|   1|John|  Nick|Sabrina|null|null|null|
|   2|Mary| Kathy|   null|null|null|null|
|   3|null|George|   null|null|null|null|
+----+----+------+-------+----+----+----+

Here is the very same code in Java

Dataset<Row> df1_win = df1
    .withColumn("id", functions.monotonically_increasing_id())
    .withColumn("rank", functions.rank().over(Window.partitionBy("key").orderBy("id")));
    // the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
// Note that it is a list of objects, to match the (strange) signature of pivot
List<Object> keys = df2.collectAsList().stream()
    .map(x -> x.getString(0))
    .sorted().collect(Collectors.toList());

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys)
    .agg(functions.first(functions.col("value")))
    .orderBy("rank")
    // .drop("rank") // I keep here it for clarity
    .show();

Upvotes: 3

Related Questions