Priyanshu
Priyanshu

Reputation: 121

How to merge duplicate columns in pyspark?

I have a pyspark dataframe in which some of the columns have same name. I want to merge all the columns having same name in one column. For example, Input dataframe:

enter image description here

How can I do this in pyspark? Any help would be highly appreciated.

Upvotes: 2

Views: 3600

Answers (2)

nonoDa
nonoDa

Reputation: 453

Edited to answer OP request to coalesce from list,

Here's a reproducible example

    import pyspark.sql.functions as F

    df = spark.createDataFrame([
        ("z","a", None, None),
        ("b",None,"c", None),
        ("c","b", None, None),
        ("d",None, None, "z"),
    ], ["a","c", "c","c"])
    
    df.show()
    
    #fix duplicated column names
    old_col=df.schema.names
    running_list=[]
    new_col=[]
    i=0
    for column in old_col:
        if(column in running_list):
            new_col.append(column+"_"+str(i))
            i=i+1
        else:
            new_col.append(column)
            running_list.append(column)
    print(new_col)
    
    df1 = df.toDF(*new_col)
    
    #coalesce columns to get one column from a list

a=['c','c_0','c_1']
to_drop=['c_0','c_1']
b=[]
[b.append(df1[col]) for col in a]

#coalesce columns to get one column
df_merged=df1.withColumn('c',F.coalesce(*b)).drop(*to_drop)
   
df_merged.show()

Output:

+---+----+----+----+
|  a|   c|   c|   c|
+---+----+----+----+
|  z|   a|null|null|
|  b|null|   c|null|
|  c|   b|null|null|
|  d|null|null|   z|
+---+----+----+----+

['a', 'c', 'c_0', 'c_1']

+---+---+
|  a|  c|
+---+---+
|  z|  a|
|  b|  c|
|  c|  b|
|  d|  z|
+---+---+

Upvotes: 1

s.polam
s.polam

Reputation: 10362

Check below scala code. It might help you.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
   def mergeColumns() = {
       val dupColumns = df.columns
       val newColumns = dupColumns.zipWithIndex.map(c => s"${c._1}${c._2}")
       val columns = newColumns
                        .map(c => (c(0),c))
                        .groupBy(_._1)
                        .map(c => (c._1,c._2.map(_._2)))
                        .map(c => s"""coalesce(${c._2.mkString(",")}) as ${c._1}""")
                        .toSeq
       df.toDF(newColumns:_*).selectExpr(columns:_*)
   }
}

// Exiting paste mode, now interpreting.
scala> df.show(false)
+----+----+----+----+----+----+
|a   |b   |a   |c   |a   |b   |
+----+----+----+----+----+----+
|4   |null|null|8   |null|21  |
|null|8   |7   |6   |null|null|
|96  |null|null|null|null|78  |
+----+----+----+----+----+----+
scala> df.printSchema
root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- a: string (nullable = true)
 |-- c: string (nullable = true)
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)

scala> df.mergeColumns.show(false)
+---+---+----+
|b  |a  |c   |
+---+---+----+
|21 |4  |8   |
|8  |7  |6   |
|78 |96 |null|
+---+---+----+

Upvotes: 1

Related Questions