Surender Raja
Surender Raja

Reputation: 3599

How do I send multiple columns to a udf from a When Clause in Spark dataframe?

I want to join two dataframes on basis on full_outer_join and trying to add a new column in the joined result set which tells me the matching records, unmatched records from left dataframe alone and unmatched records from right dataframe alone.

Here is my spark code:

val creditLoc ="/data/accounts/credits/year=2016/month=06/day=02"
val debitLoc = "/data/accounts/debits/year=2016/month=06/day=02"
val creditDF = sqlContext.read.avro(creditLoc)
val debitDF  = sqlContext.read.avro(debitLoc) 
val credit  =  creditDF.withColumnRenamed("account_id","credit_account_id").as("credit")
val debit   =  debitDF.withColumnRenamed("account_id","debit_account_id").as("debit")
val fullOuterDF =  credit.join(debit,credit("credit_account_id") === debit("debit_account_id"),"full_outer")
val CREDIT_DEBIT_CONSOLIDATE_SCHEMA=List(
  ("credit.credit_account_id","string"),
  ("credit.channel_name",  "string"),
  ("credit.service_key",  "string"),
  ("credit.trans_id", "string"),
  ("credit.trans_dt",  "string"),
  ("credit.trans_amount",  "string"),
  ("debit.debit_account_id","string"),
  ("debit.icf_number","string"),
  ("debit.debt_amount","string")
)

val columnNamesList = CREDIT_DEBIT_CONSOLIDATE_SCHEMA.map(elem => col(elem._1)).seq 
val df  = fullOuterDF.select(columnNamesList:_*)

val caseDF = df.withColumn("matching_type",
  when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
    when(df("debit_account_id").isNull,"UNMATCHED_CREDIT").otherwise(
      when(df("credit_account_id").isNull,"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
    )
  )
)

As of now I applied the logic for "matching_type" inside a when clause itself, but now I want to write the logic of "matching_type" inside an UDF. If the write like the above the code works.

The below UDFs accepts a single column as parameter, how do I create a udf that accepts multiple columns and return a boolean based on conditions inside that udf?

val isUnMatchedCREDIT = udf[Boolean, String](credit_account_id => {
  credit_account_id == null
})

val isUnMatchedDEBIT = udf[Boolean, String](debit_account_id => {
  debit_account_id == null
})


val caseDF = df.withColumn("matching_type",
  when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
    when(isUnMatchedCREDIT(df("credit_account_id")),"UNMATCHED_CREDIT").otherwise(
      when(isUnMatchedDEBIT(df("debit_account_id")),"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
      )
    )
  )
)

Basically I want to create another UDF as isMatchedCREDITDEBIT() that accepts two columns credit_account_id and debit_account_id and that UDF should return true if both the values are equal else false. In simple words, I want to created an UDF for the below logic:

when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT")

I have tried this but it is throwing compile type error:

val isMatchedCREDITDEBIT()= udf[Boolean, String,String](credit_account_id => {
  credit_account_id == debit_account_id 
})

Could someone help me on this?

Upvotes: 1

Views: 1769

Answers (1)

Shaido
Shaido

Reputation: 28322

You can create an udf that takes two columns and perform your logic like this:

val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
  credit_account_id == debit_account_id
})

which can be called in the when clause

when(isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")), "MATCHING_CREDIT_DEBIT")

However, it would be easier to create a single udf for all the logic you are performing on the two columns. The udf below takes both columns as input and returns the string you want, instead of a boolean.

val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
  if(credit_account_id == null){
    "UNMATCHED_CREDIT"
  } else if (debit_account_id == null){
    "UNMATCHED_DEBIT"
  } else if (credit_account_id == debit_account_id){
    "MATCHING_CREDIT_DEBIT"
  } else {
    "INVALID_MATCHING_TYPE"
  }
})

val caseDF = df.withColumn("matching_type", 
  isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")))

Upvotes: 2

Related Questions