Kiwy
Kiwy

Reputation: 346

spark sql udf cast return value

I have a variable declare like this :

val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))

I want to be able to create a function usable in Spark SQL to substitute 42461545 by name4 in an sql query.
I tried to declare this function:

val jobnameDF = jobnameSeq.toDF("jobid","jobname")
sqlContext.udf.register("getJobname", (id: String) => (
     jobnameDF.filter($"jobid" === id).select($"jobname")
    )
)

To be use like this in sql:

select getjobname(jobid), other, field from table  

But jobnameDF.filter($"jobid" === id).select($"jobname") returns a DF not a string and I can't figure out how to simply convert this value to string as there will be only one result each time.

If a Seq is not the object to use in this case, I'm open to suggestion.

Edit:
Though the suggested answer works, here's what I did exactly to make this work:

#Convert my seq to a hash map
val jobMap = jobnameSeq.toMap
#declare a sql function so I could use it in sparksql (I need to be accessible to people that don't know scala
sqlContext.udf.register("getJobname", (id: String) => (
    jobMap(id)
    )
)

Upvotes: 0

Views: 1066

Answers (2)

1pluszara
1pluszara

Reputation: 1528

You can do this in many ways:

val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),
                      ("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))
val jobIdDF = Seq( "42409245",("42409291"),("42409231")).toDF("jobID")
jobIdDF.createOrReplaceTempView("JobView")

Just use plain scala's toMap function on the Jobname Sequence.

sqlContext.udf.register("jobNamelookUp", (jobID: String) =>  
                                            jobnameSeq.toMap.getOrElse(jobID,"null"))

//OR

If the input is an RDD then use collectAsMap using spark.

val jobnameMap = sc.parallelize(jobnameSeq).collectAsMap
sqlContext.udf.register("lookupJobName",(jobID:String) => 
                                            jobnameMap.getOrElse(jobID,"null"))

//OR

If this lookup is going happen on a cluster then you can broadcast it.

val jobnameMapBC = sc.broadcast(jobnameMap)
sqlContext.udf.register("lookupJobNameBC",(jobID:String) => 
                                                jobnameMapBC.value.getOrElse(jobID,"null")) 

spark.sql("select jobID,jobNamelookUp(jobID) as jobNameUsingMap,
                        lookupJobNameBC(jobID) as jobNameUsingBC,
                        lookupJobName(jobID) as jobNameUsingRDDMap 
         from JobView")
    .show()

+--------+---------------+--------------+------------------+
|   jobID|jobNameUsingMap|jobNameUsingBC|jobNameUsingRDDMap|
+--------+---------------+--------------+------------------+
|42409245|         name12|        name12|            name12|
|42409291|          name1|         name1|             name1|
|42409231|           null|          null|              null|
+--------+---------------+--------------+------------------+    

As suggested by Raphael, using broadcast-join:

import org.apache.spark.sql.functions._
val jobnameSeqDF = jobnameSeq.toDF("jobID","name")
jobIdDF.join(broadcast(jobnameSeqDF), Seq("jobID"),"leftouter").show(false)

+--------+------+
|jobID   |name  |
+--------+------+
|42409245|name12|
|42409291|name1 |
|42409231|null  |
+--------+------+

Upvotes: 2

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

As far as I can understand from your question, you should be creating a Map from your sequence and get the jobId directly as

val simpleMap = jobnameSeq.toMap

println(simpleMap("42461545"))

which should give you name4

Now if you want to test with dataframe, you can do the following

val jobnameDF = jobnameSeq.toDF("jobid","jobname")

val jobName = jobnameDF.filter($"jobid" === "42461545").select("jobname").first().getAs[String]("jobname")

println(jobName)

which should print name4

Upvotes: 1

Related Questions