Reputation: 3599
I want to join two dataframes on basis on full_outer_join and trying to add a new column in the joined result set which tells me the matching records, unmatched records from left dataframe alone and unmatched records from right dataframe alone.
Here is my spark code:
val creditLoc ="/data/accounts/credits/year=2016/month=06/day=02"
val debitLoc = "/data/accounts/debits/year=2016/month=06/day=02"
val creditDF = sqlContext.read.avro(creditLoc)
val debitDF = sqlContext.read.avro(debitLoc)
val credit = creditDF.withColumnRenamed("account_id","credit_account_id").as("credit")
val debit = debitDF.withColumnRenamed("account_id","debit_account_id").as("debit")
val fullOuterDF = credit.join(debit,credit("credit_account_id") === debit("debit_account_id"),"full_outer")
val CREDIT_DEBIT_CONSOLIDATE_SCHEMA=List(
("credit.credit_account_id","string"),
("credit.channel_name", "string"),
("credit.service_key", "string"),
("credit.trans_id", "string"),
("credit.trans_dt", "string"),
("credit.trans_amount", "string"),
("debit.debit_account_id","string"),
("debit.icf_number","string"),
("debit.debt_amount","string")
)
val columnNamesList = CREDIT_DEBIT_CONSOLIDATE_SCHEMA.map(elem => col(elem._1)).seq
val df = fullOuterDF.select(columnNamesList:_*)
val caseDF = df.withColumn("matching_type",
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
when(df("debit_account_id").isNull,"UNMATCHED_CREDIT").otherwise(
when(df("credit_account_id").isNull,"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
)
)
)
As of now I applied the logic for "matching_type" inside a when
clause itself, but now I want to write the logic of "matching_type" inside an UDF
.
If the write like the above the code works.
The below UDF
s accepts a single column as parameter, how do I create a udf that accepts multiple columns and return a boolean based on conditions inside that udf?
val isUnMatchedCREDIT = udf[Boolean, String](credit_account_id => {
credit_account_id == null
})
val isUnMatchedDEBIT = udf[Boolean, String](debit_account_id => {
debit_account_id == null
})
val caseDF = df.withColumn("matching_type",
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT").otherwise(
when(isUnMatchedCREDIT(df("credit_account_id")),"UNMATCHED_CREDIT").otherwise(
when(isUnMatchedDEBIT(df("debit_account_id")),"UNMATCHED_DEBIT").otherwise("INVALID_MATCHING_TYPE")
)
)
)
)
Basically I want to create another UDF
as isMatchedCREDITDEBIT()
that accepts two columns credit_account_id
and debit_account_id
and that UDF
should return true if both the values are equal else false. In simple words, I want to created an UDF
for the below logic:
when(df("credit_account_id") === df("debit_account_id"),"MATCHING_CREDIT_DEBIT")
I have tried this but it is throwing compile type error:
val isMatchedCREDITDEBIT()= udf[Boolean, String,String](credit_account_id => {
credit_account_id == debit_account_id
})
Could someone help me on this?
Upvotes: 1
Views: 1769
Reputation: 28322
You can create an udf
that takes two columns and perform your logic like this:
val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
credit_account_id == debit_account_id
})
which can be called in the when
clause
when(isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")), "MATCHING_CREDIT_DEBIT")
However, it would be easier to create a single udf
for all the logic you are performing on the two columns. The udf
below takes both columns as input and returns the string you want, instead of a boolean.
val isMatchedCREDITDEBIT = udf((credit_account_id: String, debit_account_id: String) => {
if(credit_account_id == null){
"UNMATCHED_CREDIT"
} else if (debit_account_id == null){
"UNMATCHED_DEBIT"
} else if (credit_account_id == debit_account_id){
"MATCHING_CREDIT_DEBIT"
} else {
"INVALID_MATCHING_TYPE"
}
})
val caseDF = df.withColumn("matching_type",
isMatchedCREDITDEBIT(df("credit_account_id"), df("debit_account_id")))
Upvotes: 2