Vicky C
Vicky C

Reputation: 53

Adding columns in Spark dataframe based on rules

I have a dataframe df, which contains below data:

**customers**   **product**   **Val_id**
     1               A            1
     2               B            X
     3               C               
     4               D            Z

i have been provided 2 rules, which are as below:

**rule_id**   **rule_name**  **product value**  **priority**
   123              ABC             A,B               1
   456              DEF             A,B,D             2

Requirement is to apply these rules on dataframe df in priority order, customers who have passed rule 1, should not be considered for rule 2 and in final dataframe add two more columns rule_id and rule_name, i have written below code to achieve it:

val rule_name = when(col("product").isin("A","B"), "ABC").otherwise(when(col("product").isin("A","B","D"), "DEF").otherwise(""))
val rule_id = when(col("product").isin("A","B"), "123").otherwise(when(col("product").isin("A","B","D"), "456").otherwise(""))
val df1 = df_customers.withColumn("rule_name" , rule_name).withColumn("rule_id" , rule_id)
df1.show()

Final output looks like below:

**customers**   **product**   **Val_id**  **rule_name**  **rule_id**
     1               A            1           ABC            123
     2               B            X           ABC            123
     3               C               
     4               D            Z           DEF            456

Is there any better way to achieve it, adding both columns by just going though entire dataset once instead of going through entire dataset twice?

Upvotes: 2

Views: 936

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29165

Question : Is there any better way to achieve it, adding both columns by just going though entire dataset once instead of going through entire dataset twice?

Answer : you can have a Map return type in scala...

Limitation : This udf if you are using with With Column for example column name is ruleIDandRuleName then you can use a single fuction with Map data type or any acceptable data type of spark sql column. Other wise you cant use the below mentioned approach

shown in the below example snippet

      def ruleNameAndruleId = udf((product : String) => {  
 if(Seq("A", "B").contains(product)) Map("ruleName"->"ABC","ruleId"->"123")   
    else if(Seq("A", "B", "D").contains(product)) (Map("ruleName"->"DEF","ruleId"->"456")   
    else (Map("ruleName"->"","ruleId"->"") })

caller will be

df.withColumn("ruleIDandRuleName",ruleNameAndruleId(product here) ) // ruleNameAndruleId  will return a map containing rulename and rule id

Upvotes: 2

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

An alternative to your solution would be to use udf functions. Its almost similar to when function as both required serialization and deserialization. Its upto you to test which is faster and efficient.

def rule_name = udf((product : String) => {
  if(Seq("A", "B").contains(product)) "ABC"
  else if(Seq("A", "B", "D").contains(product)) "DEF"
  else ""
})

def rule_id = udf((product : String) => {
  if(Seq("A", "B").contains(product)) "123"
  else if(Seq("A", "B", "D").contains(product)) "456"
  else ""
})

val df1 = df_customers.withColumn("rule_name" , rule_name(col("product"))).withColumn("rule_id" , rule_id(col("product")))
df1.show()

Upvotes: 1

Related Questions