Jessi joseph
Jessi joseph

Reputation: 191

UDF usage in spark

I have a custom udf and registered in spark.If I try to access that UDF ,It throws error.Unable to access.

I tried like this.

 spark.udf.register("rssi_weightage", FilterMap.rssi_weightage)
 val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(rssi_weightage($"RSSI").as("RSSI_Weight")))

Showing error in first(rssi_weightage($"RSSI") // rssi_weightage not found error

Any help will be appreciated.

Upvotes: 0

Views: 145

Answers (2)

Raphael Roth
Raphael Roth

Reputation: 27373

this is not how you use the udf, the actual udf is a return value from spark.udf.register. So you can do :

val udf_rssii_weightage = spark.udf.register("rssi_weightage", FilterMap.rssi_weightage)

val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(udf_rssi_weightage($"RSSI")).as("RSSI_Weight"))

But in your case you do not need to register the udf, just use org.apache.spark.sql.functions.udf to convert a regular function to an udf:

val udf_rssii_weightage = udf(FilterMap.rssi_weightage)

Upvotes: 2

user6860682
user6860682

Reputation:

I suppose you have an issue with the way you're defining the udf function, the next snapshot has a slightly different approach in announcement udf - it's directly defined function: import org.apache.spark.sql.functions._

val data = sqlContext.read.json(sc.parallelize(Seq("{'foo' : 'Bar'}", "{'foo': 'Baz'}")))

val example = Seq("Bar", "Bazzz")
val urbf = udf { foo: String => if (example.contains(example)) 1 else 0 }

data.select($"foo", urbf($"foo")).show

+--------+-------------+
|  foo   |UDF(foo)     |
+--------+-------------+
|  Bar   |            1|
|  Bazzz |            0|
+--------+-------------+

Upvotes: 1

Related Questions