Boern
Boern

Reputation: 7752

Scala Spark: Performance issue renaming huge number of columns

To be able to work with columnnames of my DataFrame without escaping the . I need a function to "validify" all columnnames - but none of the methods I tried does the job in a timely manner (I'm aborting after 5 minutes).

The dataset I'm trying my algorithms on is the golub Dataset (get it here). It's a 2.2MB CSV file with 7200 columns. Renaming all columns should be a matter of seconds

Code to read the CSV in

var dfGolub = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("golub_merged.csv")
    .drop("_c0") // drop the first column
    .repartition(numOfCores)

Attempts to rename columns:

 def validifyColumnnames1(df : DataFrame) : DataFrame = {
     import org.apache.spark.sql.functions.col
     val cols = df.columns
     val colsRenamed = cols.map(name => col(name).as(name.replaceAll("\\.","")))
     df.select(colsRenamed : _*)
 }


def validifyColumnnames2[T](df : Dataset[T]) : DataFrame = {
    val newColumnNames = ArrayBuffer[String]()
    for(oldCol <- df.columns) {
        newColumnNames +=  oldCol.replaceAll("\\.","")
    }
    df.toDF(newColumnNames : _*)
}

def validifyColumnnames3(df : DataFrame) : DataFrame = {
    var newDf = df
    for(col <- df.columns){
        newDf = newDf.withColumnRenamed(col,col.replaceAll("\\.",""))
    }
    newDf
}

Any ideas what is causing this performance issue?

Setup: I'm running Spark 2.1.0 on Ubuntu 16.04 in local[24] mode on a machine with 16cores * 2 threads and 96GB of RAM

Upvotes: 4

Views: 1639

Answers (1)

Assaf Mendelson
Assaf Mendelson

Reputation: 12991

Assuming you know the types you can simply create the schema instead of infering it (infering the schema costs performance and might even be wrong for csv).

Lets assume for simplicity you have the file example.csv as follows:

A.B, A.C, A.D
a,3,1

You can do something like this:

val scehma = StructType(Seq(StructField("A_B",StringType),StructField("A_C", IntegerType), StructField("AD", IntegerType)))
val df = spark.read.option("header","true").schema(scehma).csv("example.csv")
df.show()

+---+---+---+
|A_B|A_C| AD|
+---+---+---+
|  a|  3|  1|
+---+---+---+

IF you do not know the info in advance you can use infer schema as you did before, then you can use the dataframe to generate the schema:

val fields = for {
  x <- df.schema
} yield StructField(x.name.replaceAll("\\.",""), x.dataType, x.nullable)
val schema = StructType(fields)

and the reread the dataframe using that schema as before

Upvotes: 5

Related Questions