user1128482
user1128482

Reputation: 185

Creating a new Spark DataFrame with new column value based on column in first dataframe Java

This should be easy but....using Spark 1.6.1.... I have DataFrame #1 with columns A, B, C. With Values:

A  B  C
1  2  A
2  2  A
3  2  B
4  2  C

I then create a new dataframe with a new column D so:

DataFrame df2 = df1.withColumn("D", df1.col("C"));

so far so good but I actually want the value in column D to be conditional ie:

// pseudo code
if (col C = "A") the col D = "X"
else if (col C = "B") the col D = "Y"
else col D = "Z"

I'll then drop column C and rename D to C. I've tried looking at the Column functions but nothing appears to fit the bill; I thought of using df1.rdd().map() and iterating over the rows but aside from not actually managing to get it to work, I kind of thought that the whole point of DataFrames was to move away from the RDD abstraction?

Unfortunately I have to do this in Java (and of course Spark with Java is not optimal!!). It seems like I'm missing the obvious and am happy to be shown to be an idiot when presented with the solution!

Upvotes: 12

Views: 28953

Answers (3)

sudeepgupta90
sudeepgupta90

Reputation: 795

You may also use udf's to do the same job. Just write a simple if then else structure

import org.apache.spark.sql.functions.udf
val customFunct = udf { d =>
      //if then else construct
    }

val new_DF= df.withColumn(column_name, customFunct(df("data_column")))

Upvotes: 2

Daniel de Paula
Daniel de Paula

Reputation: 17872

I believe you can use when to achieve that. Additionally, you probably can replace the old column directly. For your example, the code would be something like:

import static org.apache.spark.sql.functions.*;

Column newCol = when(col("C").equalTo("A"), "X")
    .when(col("C").equalTo("B"), "Y")
    .otherwise("Z");

DataFrame df2 = df1.withColumn("C", newCol);

For more details about when, check the Column Javadoc.

Upvotes: 21

user1128482
user1128482

Reputation: 185

Thanks to Daniel I have resolved this :)

The missing piece was the static import of the sql functions

import static org.apache.spark.sql.functions.*;

I must have tried a million different ways of using when, but got compile failures/runtime errors because I didn't do the import. Once imported Daniel's answer was spot on!

Upvotes: 3

Related Questions