Shankar
Shankar

Reputation: 8967

Adding two columns to existing DataFrame using withColumn

I have a DataFrame with a few columns. Now I want to add two more columns to the existing DataFrame.

Currently I am doing this using withColumn method in DataFrame.

for example:

df.withColumn("newColumn1", udf(col("somecolumn")))
  .withColumn("newColumn2", udf(col("somecolumn")))

Actually I can return both newcoOlumn values in single UDF method using Array[String]. But currently this is how I am doing it.

Is there anyway, I can do this effectively? using explode is the good option here?

Even if I have to use explode, I have to use withColumn once, then return the column value as Array[String], then using explode, create two more columns.

Which one is effective? or is there any alternatives?

**Update:**Refer @blert answer, withColumns is the way to go.

Upvotes: 41

Views: 106075

Answers (2)

blert
blert

Reputation: 352

May 2023: It's now possible with new withColumns (notice the final 's') method to add several columns to an existing Spark dataframe without calling several times withColumn. You just need a map Map[String, Column]. Given two UDF's for this example udf1 and udf2 you could use this new method like this:

val dfNew=df.withColumns(Map("newCol1"->udf1(col("oldCol1")),"newCol2"->udf2(col("oldCol2"))))

More information on this can be found now at official doc.

Upvotes: 7

Raphael Roth
Raphael Roth

Reputation: 27373

AFAIk you need to call withColumn twice (once for each new column). But if your udf is computationally expensive, you can avoid to call it twice with storing the "complex" result in a temporary column and then "unpacking" the result e.g. using the apply method of column (which gives access to the array element). Note that sometimes it's necessary to cache the intermediate result (to prevent that the UDF is called twice per row during unpacking), sometimes it's not needed. This seems to depend on how spark the optimizes the plan :

val myUDf = udf((s:String) => Array(s.toUpperCase(),s.toLowerCase()))

val df = sc.parallelize(Seq("Peter","John")).toDF("name")

val newDf = df
  .withColumn("udfResult",myUDf(col("name"))).cache 
  .withColumn("uppercaseColumn", col("udfResult")(0))
  .withColumn("lowercaseColumn", col("udfResult")(1))
  .drop("udfResult")

newDf.show()

gives

+-----+---------------+---------------+
| name|uppercaseColumn|lowercaseColumn|
+-----+---------------+---------------+
|Peter|          PETER|          peter|
| John|           JOHN|           john|
+-----+---------------+---------------+

With an UDF returning a tuple, the unpacking would look like this:

val newDf = df
    .withColumn("udfResult",myUDf(col("name"))).cache
    .withColumn("lowercaseColumn", col("udfResult._1"))
    .withColumn("uppercaseColumn", col("udfResult._2"))
    .drop("udfResult")

Upvotes: 69

Related Questions