j pavan kumar
j pavan kumar

Reputation: 369

define input parameters for udf in spark scala

I am working in scala and spark

I am defining a udf , here it is

def udfcrpentcd = udf((CORP_ENT_CD:String)=>{
     state_name match{
      case "IL1" if  state_name.contains("IL1")=> "IL1"
      case "OK1" if  state_name.contains("OK1")=> "OK1"
      case "TX1" if  state_name.contains("TX1")=> "TX1"
      case "NM1" if  state_name.contains("NM1")=> "NM1"
      case "MT1" if  state_name.contains("MT1")=> "MT1"
      case _ =>"Null"
    }})




val local_masterdb =old_dataframe_temp_masterdbDataFrame.withColumn("new_columna_name_CORP_ENT_CD",udfcrpentcd(old_dataframe_temp_masterdbDataFrame("last_column_of_old_dataframe_DB_STATUS")+1))
    local_masterdb.show()

now , i want to reuse the above udf ,

I want to make it universal, instead of comparing state_name , I need to pass a string and then it returns CRP_ENT_CD ... that is what I want to do .

is this the right way ....

def udfcrpentcd (input_parameter:String) = udf((CORP_ENT_CD:String)=>{
     input_parameter match{
      case "IL1" if  input_parameter.contains("IL1")=> "IL1"
      case "OK1" if  input_parameter.contains("OK1")=> "OK1"
      case "TX1" if  input_parameter.contains("TX1")=> "TX1"
      case "NM1" if  input_parameter.contains("NM1")=> "NM1"
      case "MT1" if  input_parameter.contains("MT1")=> "MT1"
      case _ =>"Null"
    }})

if this is right way then how to call it back? anyhelp about passing parameters

Upvotes: 0

Views: 443

Answers (1)

srujana
srujana

Reputation: 182

Here is an example of how you can pass parameters to an udf.

val udfcrpentcd_res = udf(udfcrpentcd)
def udfcrpentcd (String => String) = (input_parameter: String) =>{
 input_parameter match{
  case "IL1" if  input_parameter.contains("IL1")=> "IL1"
  case "OK1" if  input_parameter.contains("OK1")=> "OK1"
  case "TX1" if  input_parameter.contains("TX1")=> "TX1"
  case "NM1" if  input_parameter.contains("NM1")=> "NM1"
  case "MT1" if  input_parameter.contains("MT1")=> "MT1"
  case _ =>"Null"
}})

val local_masterdb = old_dataframe_temp_masterdbDataFrame.withColumn("new_columna_name_CORP_ENT_CD",udfcrpentcd_res(old_dataframe_temp_masterdbDataFrame("last_column_of_old_dataframe_DB_STATUS")+1))
local_masterdb.show()

Upvotes: 1

Related Questions