Reputation: 161
I'm doing something like pattern matching in spark streaming app. What I want is updating a variable like broadcast variable, which however is mutable. Is there a way to do that? Any advice?
EDIT
Sorry for not being so clear. I am doing some CEP stuff on logs. I need to load the rules from elasticsearch while the spark application is running. And I wanna apply these rules on the worker side (on each RDD).
Upvotes: 1
Views: 1974
Reputation: 513
The idea here is to write a wrapper over the broadcast variable that gets refreshed periodically. Catch is to call this function inside transform (or any other variation) which allows RDD-RDD operations.
Code Snipped for the BroadcastWrapper class:
public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();
private static BroadcastWrapper obj = new BroadcastWrapper();
private BroadcastWrapper(){}
public static BroadcastWrapper getInstance() {
return obj;
}
public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}
public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());
//Your logic to refresh
ReferenceData data = getRefData();
var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}
To use this method we can do something like ->
objectStream.transform(stream -> {
Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());
/*Your code here*/
});
Please see my answer on another thread for better clarity https://stackoverflow.com/a/41259333/3166245
Upvotes: 2