J  Calbreath
J Calbreath

Reputation: 2705

Create new column with function in Spark Dataframe

I'm trying to figure out the new dataframe API in Spark. Seems like a good step forward but having trouble doing something that should be pretty simple. I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a function something like this:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", coder(myDF("Amt")))

I get type mismatch errors

found   : org.apache.spark.sql.Column
required: Integer

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting errors with the function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there a better/another way to do this than using withColumn?

Thanks for your help.

Upvotes: 42

Views: 80799

Answers (3)

imran
imran

Reputation: 111

Another way of doing this: You can create any function but according to the above error, you should define function as a variable

Example:

val coder = udf((myAmt:Integer) => {
  if (myAmt > 100) "Little"
  else "Big"
})

Now this statement works perfectly:

myDF.withColumn("Code", coder(myDF("Amt")))

Upvotes: 4

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

We should avoid defining udf functions as much as possible due to its overhead of serialization and deserialization of columns.

You can achieve the solution with simple when spark function as below

val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")

myDF.withColumn("Code", when(myDF("Amt") < 100, "Little").otherwise("Big"))

Upvotes: 17

yjshen
yjshen

Reputation: 6693

Let's say you have "Amt" column in your Schema:

import org.apache.spark.sql.functions._
val myDF = sqlContext.parquetFile("hdfs:/to/my/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
myDF.withColumn("Code", sqlfunc(col("Amt")))

I think withColumn is the right way to add a column

Upvotes: 64

Related Questions