Maruti K
Maruti K

Reputation: 233

Transpose DataFrame Without Aggregation in Spark with scala

I looked number different solutions online, but count not find what I am trying to achine. Please help me on this.

I am using Apache Spark 2.1.0 with Scala. Below is my dataframe:


+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1       | val1  |
|col2       | val2  |
|col3       | val3  |
|col4       | val4  |
|col5       | val5  |
+-----------+-------+

I want this to be transpose to, as below:


+-----+-------+-----+------+-----+
|col1 | col2  |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2  |val3 | val4 |val5 |
+-----+-------+-----+------+-----+

Upvotes: 22

Views: 36591

Answers (4)

stack0114106
stack0114106

Reputation: 8711

To enhance Ramesh Maharjan's answer, collect and then convert it to a map.

val mp = df.as[(String,String)].collect.toMap

with a dummy dataframe, we can build further using foldLeft

val f = Seq("1").toDF("dummy")

mp.keys.toList.sorted.foldLeft(f) { (acc,x) => acc.withColumn(mp(x),lit(x) ) }.drop("dummy").show(false)

+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+

Upvotes: 0

stack0114106
stack0114106

Reputation: 8711

Another solution though lengthy using crosstab.

 val dfp = spark.sql(""" with t1 (
 select  'col1' c1, 'val1' c2  union all
 select  'col2' c1, 'val2' c2  union all
 select  'col3' c1, 'val3' c2  union all
 select  'col4' c1, 'val4' c2  union all
 select  'col5' c1, 'val5' c2
  )  select   c1  COLUMN_NAME,   c2  VALUE     from t1
""")
dfp.show(50,false)

+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1       |val1 |
|col2       |val2 |
|col3       |val3 |
|col4       |val4 |
|col5       |val5 |
+-----------+-----+

val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |1   |0   |0   |0   |0   |
|val3             |0   |0   |1   |0   |0   |
|val2             |0   |1   |0   |0   |0   |
|val5             |0   |0   |0   |0   |1   |
|val4             |0   |0   |0   |1   |0   |
+-----------------+----+----+----+----+----+

val needed_cols = dfp2.columns.drop(1)

needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)

val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |val1|0   |0   |0   |0   |
|val3             |0   |0   |val3|0   |0   |
|val2             |0   |val2|0   |0   |0   |
|val5             |0   |0   |0   |0   |val5|
|val4             |0   |0   |0   |val4|0   |
+-----------------+----+----+----+----+----+

dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

Upvotes: 2

Raphael Roth
Raphael Roth

Reputation: 27373

You can do this using pivot, but you still need aggregation but what if you have multiple value for a COLUMN_NAME?

val df = Seq(
  ("col1", "val1"),
  ("col2", "val2"),
  ("col3", "val3"),
  ("col4", "val4"),
  ("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")

df
  .groupBy()
  .pivot("COLUMN_NAME").agg(first("VALUE"))
  .show()

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

EDIT:

if your dataframe is really that small as in your example, you can collect it as Map:

val map = df.as[(String,String)].collect().toMap

and then apply this answer

Upvotes: 19

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

If your dataframe is small enough as in the question, then you can collect COLUMN_NAME to form schema and collect VALUE to form the rows and then create a new dataframe as

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row] 
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)

which should give you

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

Upvotes: 13

Related Questions