Reputation: 191
I have a custom udf and registered in spark.If I try to access that UDF ,It throws error.Unable to access.
I tried like this.
spark.udf.register("rssi_weightage", FilterMap.rssi_weightage)
val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(rssi_weightage($"RSSI").as("RSSI_Weight")))
Showing error in first(rssi_weightage($"RSSI") // rssi_weightage not found error
Any help will be appreciated.
Upvotes: 0
Views: 145
Reputation: 27373
this is not how you use the udf, the actual udf is a return value from spark.udf.register
. So you can do :
val udf_rssii_weightage = spark.udf.register("rssi_weightage", FilterMap.rssi_weightage)
val filterop = input_data.groupBy($"tagShortID", $"Timestamp", $"ListenerShortID", $"rootOrgID", $"subOrgID").agg(first(udf_rssi_weightage($"RSSI")).as("RSSI_Weight"))
But in your case you do not need to register the udf, just use org.apache.spark.sql.functions.udf
to convert a regular function to an udf:
val udf_rssii_weightage = udf(FilterMap.rssi_weightage)
Upvotes: 2
Reputation:
I suppose you have an issue with the way you're defining the udf function, the next snapshot has a slightly different approach in announcement udf - it's directly defined function: import org.apache.spark.sql.functions._
val data = sqlContext.read.json(sc.parallelize(Seq("{'foo' : 'Bar'}", "{'foo': 'Baz'}")))
val example = Seq("Bar", "Bazzz")
val urbf = udf { foo: String => if (example.contains(example)) 1 else 0 }
data.select($"foo", urbf($"foo")).show
+--------+-------------+
| foo |UDF(foo) |
+--------+-------------+
| Bar | 1|
| Bazzz | 0|
+--------+-------------+
Upvotes: 1