Arjun
Arjun

Reputation: 271

Append new column to spark DF based on logic

Need to add a new column to below DF based on other columns. Here is the DF schema

scala> a.printSchema()
root
 |-- ID: decimal(22,0) (nullable = true)
 |-- NAME: string (nullable = true)
 |-- AMOUNT: double (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- NAME1: string (nullable = true)
 |-- code1: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- revised_code string (nullable = true)

now i want to add a column say flag as per below conditions

 1=> if code == revised_code, than flag is P
 2 => if code != revised code than I  
 3=> if both code and revised_code is null than no flag.

this is the udf that i am trying, but its giving I for both case 1 and 3.

 def tagsUdf =
    udf((code: String, revised_code: String) =>
      if (code == null  && revised_code == null ) ""
      else if (code == revised_code) "P" else "I")


tagsUdf(col("CODE"), col("revised_code"))

Can anyone please point out as what mistake am I doing

I/P DF
+-------------+-------+------------+
|NAME         |   CODE|revised_code|
+-------------+-------+------------+
|       amz   |   null|       null|
|   Watch     |   null|       5812|
|   Watch     |   null|       5812|
|   Watch     |   5812|       5812|
|       amz   |   null|       null|
|   amz       | 9999  |       4352|
+-------------+-------+-----------+
Schema:
root
 |-- MERCHANT_NAME: string (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- revised_mcc: string (nullable = true)

O/P DF    
+-------------+-------+-----------------+
|NAME         |   CODE|revised_code|flag|
+-------------+-------+-----------------+
|   amz       |   null|       null| null|
|   Watch     |   null|       5812|  I  |
|   Watch     |   null|       5812|  I  |
|   Watch     |   5812|       5812|  P  |
|   amz       |   null|       null| null|
|amz          | 9999  |       4352|  I  |
+-------------+-------+-----------------+

Upvotes: 0

Views: 252

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

You don't need a udf function for that. A simple nested when inbuilt function should do the trick.

import org.apache.spark.sql.functions._
df.withColumn("CODE", col("CODE").cast("string"))
  .withColumn("flag", when(((isnull(col("CODE")) || col("CODE") === "null") && (isnull(col("revised_code")) || col("revised_code") === "null")), "").otherwise(when(col("CODE") === col("revised_code"), "P").otherwise("I")))
  .show(false)

Here, CODE column is casted to stringType before logic applying using when so that both CODE and revised_code match in datatype when comparing.

Note: CODE column is an IntegerType and it cannot be null in any case.

Upvotes: 1

Related Questions