Reputation: 2730
I have looked at a number of questions online, but they don't seem to do what I'm trying to achieve.
I'm using Apache Spark 2.0.2 with Scala.
I have a dataframe:
+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
| 1| 100| 0| 0| 0| 0| 0|
| 2| 0| 50| 0| 0| 20| 0|
| 3| 0| 0| 0| 0| 0| 0|
| 4| 0| 0| 0| 0| 0| 0|
+----------+-----+----+----+----+----+----+
which I want to transpose to
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+-----+----+----+----+
I've tried using pivot()
but I couldn't get to the right answer. I ended up looping through my val{x}
columns, and pivoting each as per below, but this is proving to be very slow.
val d = df.select('segment_id, 'val1)
+----------+-----+
|segment_id| val1|
+----------+-----+
| 1| 100|
| 2| 0|
| 3| 0|
| 4| 0|
+----------+-----+
d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
+----+-----+----+----+----+
Then using union()
on each iteration of val{x}
to my first dataframe.
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val2| 0| 50| 0| 0|
+----+-----+----+----+----+
Is there a more efficient way of a transpose where I do not want to aggregate data?
Thanks :)
Upvotes: 13
Views: 31634
Reputation: 75
Here is the solution for Pyspark https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.transpose.html
Here is the solution code for your problem:
Step1: Choose columns
d = df.select('val1','val2','val3','val4','val5','val6','segment_id')
This code part can form the data frame like this:
+----------+-----+----+----+----+----+----+
| val1|val2|val3|val4|val5|val6|segment_id
+----------+-----+----+----+----+----+----+
| 100| 0| 0| 0| 0| 0| 1 |
| 0| 50| 0| 0| 20| 0| 2 |
| 0| 0| 0| 0| 0| 0| 3 |
| 0| 0| 0| 0| 0| 0| 4 |
+----------+-----+----+----+----+----+----+
Step 2: Transpose the whole table.
d_transposed = d.T.sort_index()
This code part can form the data frame like this:
+----+-----+----+----+----+----+-
|segment_id| 1| 2| 3| 4|
+----+-----+----+----+----+----+-
|val1 | 100| 0| 0| 0|
|val2 | 0| 50| 0| 0|
|val3 | 0| 0| 0| 0|
|val4 | 0| 0| 0| 0|
|val5 | 0| 20| 0| 0|
|val6 | 0| 0| 0| 0|
+----+-----+----+----+----+----+-
Step 3: You need to rename the segment_id
to vals
:
d_transposed.withColumnRenamed("segment_id","vals")
+----+-----+----+----+----+----+-
|vals | 1| 2| 3| 4|
+----+-----+----+----+----+----+-
|val1 | 100| 0| 0| 0|
|val2 | 0| 50| 0| 0|
|val3 | 0| 0| 0| 0|
|val4 | 0| 0| 0| 0|
|val5 | 0| 20| 0| 0|
|val6 | 0| 0| 0| 0|
+----+-----+----+----+----+----+-
Here is your full code:
d = df.select('val1','val2','val3','val4','val5','val6','segment_id')
d_transposed = d.T.sort_index()
d_transposed.withColumnRenamed("segment_id","vals")
Upvotes: 1
Reputation: 11
In python, this can be done in a simple way I normally use transpose function in Pandas by converting the spark DataFrame
spark_df.toPandas().T
Upvotes: 1
Reputation: 74
This should be a perfect solution.
val seq = Seq((1,100,0,0,0,0,0),(2,0,50,0,0,20,0),(3,0,0,0,0,0,0),(4,0,0,0,0,0,0))
val df1 = seq.toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
df1.show()
val schema = df1.schema
val df2 = df1.flatMap(row => {
val metric = row.getInt(0)
(1 until row.size).map(i => {
(metric, schema(i).name, row.getInt(i))
})
})
val df3 = df2.toDF("metric", "vals", "value")
df3.show()
import org.apache.spark.sql.functions._
val df4 = df3.groupBy("vals").pivot("metric").agg(first("value"))
df4.show()
Upvotes: -1
Reputation: 330203
Unfortunately there is no case when:
DataFrame
is justified considering amount of data.You have to remember that DataFrame
, as implemented in Spark, is a distributed collection of rows and each row is stored and processed on a single node.
You could express transposition on a DataFrame
as pivot
:
val kv = explode(array(df.columns.tail.map {
c => struct(lit(c).alias("k"), col(c).alias("v"))
}: _*))
df
.withColumn("kv", kv)
.select($"segment_id", $"kv.k", $"kv.v")
.groupBy($"k")
.pivot("segment_id")
.agg(first($"v"))
.orderBy($"k")
.withColumnRenamed("k", "vals")
but it is merely a toy code with no practical applications. In practice it is not better than collecting data:
val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
case Array(h, t @ _*) => {
(h.map(_.toString), t.map(_.collect { case x: Int => x }))
}
}
val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
val schema = StructType(
StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
)
spark.createDataFrame(sc.parallelize(rows), schema)
For DataFrame
defined as:
val df = Seq(
(1, 100, 0, 0, 0, 0, 0),
(2, 0, 50, 0, 0, 20, 0),
(3, 0, 0, 0, 0, 0, 0),
(4, 0, 0, 0, 0, 0, 0)
).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")
both would you give you the desired result:
+----+---+---+---+---+
|vals| 1| 2| 3| 4|
+----+---+---+---+---+
|val1|100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+---+---+---+---+
That being said if you need an efficient transpositions on distributed data structure you'll have to look somewhere else. There is a number of structures, including core CoordinateMatrix
and BlockMatrix
, which can distribute data across both dimensions and can be transposed.
Upvotes: 19