
Reputation: 1239

Transpose column to row with Spark

I'm trying to transpose some columns of my table to row. I'm using Python and Spark 1.5.0. Here is my initial table:

|  A  |col_1|col_2|col_...|
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

I would like to have somthing like this:

|  A  | col_id | col_value |
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

Does someone know haw I can do it? Thank you for your help.

Upvotes: 51

Views: 93627

Answers (9)


Reputation: 330393

Spark >= 3.4

You can use built-in melt method. With Python:

    ids=["A"], values=["col_1", "col_2"],
    variableColumnName="key", valueColumnName="val"

with Scala

df.melt(Array($"A"), Array($"col_1", $"col_2"), "key", "val")

Spark < 3.4

It is relatively simple to do with basic Spark SQL functions.


from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])


import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  val byExprs = by.map(col(_))

    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)

toLong(df, Seq("A"))

Upvotes: 76


Reputation: 146

I found PySpark to be too complicated to transpose so I just convert my dataframe to Pandas and use the transpose() method and convert the dataframe back to PySpark if required.

dfOutput = spark.createDataFrame(dfPySpark.toPandas().transpose())

Upvotes: 0

Gonza Piotti
Gonza Piotti

Reputation: 757

You could use the stack function:

for example:

df.selectExpr("stack(2, 'col_1', col_1, 'col_2', col_2) as (key, value)")


  • 2 is the number of columns to stack (col_1 and col_2)
  • 'col_1' is a string for the key
  • col_1 is the column from which to take the values

if you have several columns, you could build the whole stack string iterating the column names and pass that to selectExpr

Upvotes: 9

Artem Zaika
Artem Zaika

Reputation: 1221

To transpose Dataframe in pySpark, I use pivot over the temporary created column, which I drop at the end of the operation.

Say, we have a table like this. What we wanna do is to find all users over each listed_days_bin value.

|  listed_days_bin | users_count | 
|1                 |            5| 
|0                 |            2|
|0                 |            1| 
|1                 |            3|  
|1                 |            4| 
|2                 |            5| 
|2                 |            7|  
|2                 |            2|  
|1                 |            1|

Create new temp column - 'pvt_value', aggregate over it and pivot results

import pyspark.sql.functions as F

agg_df = df.withColumn('pvt_value', lit(1))\

New Dataframe should look like:

|  0 | 1 | 2 | # Columns 
|   3| 13| 14| # Users over the bin

Upvotes: 0

Vamsi Prabhala
Vamsi Prabhala

Reputation: 49270

One way to solve with pyspark sql using functions create_map and explode.

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))

Upvotes: 14


Reputation: 11593

Use flatmap. Something like below should work

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))

Upvotes: 2

Parul Singh
Parul Singh

Reputation: 11

A very handy way to implement:

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)

Upvotes: 1


Reputation: 7170

I took the Scala answer that @javadba wrote and created a Python version for transposing all columns in a DataFrame. This might be a bit different from what OP was asking...

from itertools import chain
from pyspark.sql import DataFrame

def _sort_transpose_tuple(tup):
    x, y = tup
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]

def transpose(X):
    """Transpose a PySpark DataFrame.

    X : PySpark ``DataFrame``
        The ``DataFrame`` that should be tranposed.
    # validate
    if not isinstance(X, DataFrame):
        raise TypeError('X should be a DataFrame, not a %s' 
                        % type(X))

    cols = X.columns
    n_features = len(cols)

    # Sorry for this unreadability...
    return X.rdd.flatMap( # make into an RDD
        lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
        lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
        lambda grp_res: grp_res[0]).map( # sort by index % n_features key
        lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
        lambda key_col: key_col[1]).toDF() # return to DF

For example:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
| _1| _2| _3|
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|

>>> transpose(X).show()
| _1| _2| _3|
|  1|  4|  7|
|  2|  5|  8|
|  3|  6|  9|

Upvotes: 1


Reputation: 63221

The Spark local linear algebra libraries are presently very weak: and they do not include basic operations as the above.

There is a JIRA for fixing this for Spark 2.1 - but that will not help you today.

Something to consider: performing a transpose will likely require completely shuffling the data.

For now you will need to write RDD code directly. I have written transpose in scala - but not in python. Here is the scala version:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .groupBy {
      _._2 % nCols
      .toSeq.sortBy {

So you can convert that to python for your use. I do not have bandwidth to write/test that at this particular moment: let me know if you were unable to do that conversion.

At the least - the following are readily converted to python.

  • zipWithIndex --> enumerate() (python equivalent - credit to @zero323)
  • map --> [someOperation(x) for x in ..]
  • groupBy --> itertools.groupBy()

Here is the implementation for flatten which does not have a python equivalent:

  def flatten(L):
        for item in L:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

So you should be able to put those together for a solution.

Upvotes: 7

Related Questions