Reputation: 505
I have a Properties variable which I populate by calling a REST service. I require to pass this list to my UDFs. I thought that a broadcast variable would serve my purpose well since the Properties list can be long. So I created a broadcast variable in my main class:
Properties kp = getApplicationProperties(rootPath);
Broadcast<Properties> brVar = sc.broadcast(kp);
However, what would I send the variable as in my UDFs? I tried to send the literal value using org.apache.spark.sql.functions.lit, but this led to my UDF never being called:
spark.sqlContext().udf().registerJava("MongoInsert", MongoInsert.class.getName(), DataTypes.StringType);
persondatasetwithResid.select(callUDF("MongoInsert", lit(rootPath).cast(DataTypes.StringType),
col("value").cast(DataTypes.StringType), col("resourceId").cast(DataTypes.StringType),
lit(brVar))).show();
public class MongoInsert implements UDF4<String, String, String, Broadcast<Properties>, String> {
public String call(String rootPath, String jsonstring, String resourceId, Broadcast<Properties> brVar)
throws Exception {
Properties kp = brVar.value();
}
}
I cannot find any resource which could explain how to pass broadcast variables to UDFs in Java. Please help me.
I am getting indications across the web that only column types and literal string types can be passed into UDFs. Is this so? Can other variables like maps, arrays etc. not be passed in? Remember, I am talking of Spark with Java. In Scala, it seems there is a lot more flexibility.
I am also getting a lot of literature pointing to something called typedLit which actually lets us work with Maps and Arrays, eg. the following question:
How to add a Map column to Spark dataset?
Does this mean that wrapping the variable in lit() won't serve my purpose? I tried wrapping a simple Map variable with typedLit(), but it's giving me a compilation error, saying
"The method typedLit(T, TypeTags.TypeTag<T>) in the type functions is not applicable for the arguments (Map<String,String>)"
And quite predictably, there are a plethora of resources on the web about how to use typedLit in Scala, but next to nothing in Spark with Java.
I found another question:
how to set and get static variables from spark?
This one also could provide an answer to me as a static variable passed to all the classes could serve my purpose. The answer specifies broadcast variables again, but also specifies closures. Once again, there is no example of usages of such closures in Java, not even in the official Spark documentation! If someone could please show me how to create a closure in Java and pass a variable to UDFs using that, it would greatly help me.
Upvotes: 1
Views: 3148
Reputation: 2178
Here is an example to access broadcast variable at class level rather than being passed in.
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
public class Test {
Broadcast<String> broadcastVar = null;
public UDF1 myudf = new UDF1<String,String>(){
@Override
public String call(String x) {
return broadcastVar.getValue();
}
};
public void setBroadcastVar(Broadcast<String> broadcastVar){
this.broadcastVar = broadcastVar;
}
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local").appName("test").getOrCreate();
JavaSparkContext js = new JavaSparkContext(spark.sparkContext());
Test t = new Test();
t.setBroadcastVar(js.broadcast("hellow world"));
spark.udf().register("myudf",t.myudf,DataTypes.StringType);
//use the udf
}
}
Upvotes: 0