Reputation: 346
I have a variable declare like this :
val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))
I want to be able to create a function usable in Spark SQL to substitute 42461545
by name4
in an sql query.
I tried to declare this function:
val jobnameDF = jobnameSeq.toDF("jobid","jobname")
sqlContext.udf.register("getJobname", (id: String) => (
jobnameDF.filter($"jobid" === id).select($"jobname")
)
)
To be use like this in sql:
select getjobname(jobid), other, field from table
But jobnameDF.filter($"jobid" === id).select($"jobname")
returns a DF not a string and I can't figure out how to simply convert this value to string as there will be only one result each time.
If a Seq
is not the object to use in this case, I'm open to suggestion.
Edit:
Though the suggested answer works, here's what I did exactly to make this work:
#Convert my seq to a hash map
val jobMap = jobnameSeq.toMap
#declare a sql function so I could use it in sparksql (I need to be accessible to people that don't know scala
sqlContext.udf.register("getJobname", (id: String) => (
jobMap(id)
)
)
Upvotes: 0
Views: 1066
Reputation: 1528
You can do this in many ways:
val jobnameSeq = Seq( ("42409245", "name12"),("42461545", "name4"),
("42409291", "name1"),("42413872", "name3"),("42417044", "name2"))
val jobIdDF = Seq( "42409245",("42409291"),("42409231")).toDF("jobID")
jobIdDF.createOrReplaceTempView("JobView")
Just use plain scala's toMap
function on the Jobname Sequence.
sqlContext.udf.register("jobNamelookUp", (jobID: String) =>
jobnameSeq.toMap.getOrElse(jobID,"null"))
//OR
If the input is an RDD then use collectAsMap
using spark.
val jobnameMap = sc.parallelize(jobnameSeq).collectAsMap
sqlContext.udf.register("lookupJobName",(jobID:String) =>
jobnameMap.getOrElse(jobID,"null"))
//OR
If this lookup is going happen on a cluster then you can broadcast
it.
val jobnameMapBC = sc.broadcast(jobnameMap)
sqlContext.udf.register("lookupJobNameBC",(jobID:String) =>
jobnameMapBC.value.getOrElse(jobID,"null"))
spark.sql("select jobID,jobNamelookUp(jobID) as jobNameUsingMap,
lookupJobNameBC(jobID) as jobNameUsingBC,
lookupJobName(jobID) as jobNameUsingRDDMap
from JobView")
.show()
+--------+---------------+--------------+------------------+
| jobID|jobNameUsingMap|jobNameUsingBC|jobNameUsingRDDMap|
+--------+---------------+--------------+------------------+
|42409245| name12| name12| name12|
|42409291| name1| name1| name1|
|42409231| null| null| null|
+--------+---------------+--------------+------------------+
As suggested by Raphael
, using broadcast-join
:
import org.apache.spark.sql.functions._
val jobnameSeqDF = jobnameSeq.toDF("jobID","name")
jobIdDF.join(broadcast(jobnameSeqDF), Seq("jobID"),"leftouter").show(false)
+--------+------+
|jobID |name |
+--------+------+
|42409245|name12|
|42409291|name1 |
|42409231|null |
+--------+------+
Upvotes: 2
Reputation: 41957
As far as I can understand from your question, you should be creating a Map
from your sequence and get the jobId
directly as
val simpleMap = jobnameSeq.toMap
println(simpleMap("42461545"))
which should give you name4
Now if you want to test with dataframe
, you can do the following
val jobnameDF = jobnameSeq.toDF("jobid","jobname")
val jobName = jobnameDF.filter($"jobid" === "42461545").select("jobname").first().getAs[String]("jobname")
println(jobName)
which should print name4
Upvotes: 1