bfaskiplar
bfaskiplar

Reputation: 885

Periodic Broadcast in Apache Spark Streaming

I am implementing a stream learner for text classification. There are some single-valued parameters in my implementation that needs to be updated as new stream items arrive. For example, I want to change learning rate as the new predictions are made. However, I doubt that there is a way to broadcast variables after the initial broadcast. So what happens if I need to broadcast a variable every time I update it. If there is a way to do it or a workaround for what I want to accomplish in Spark Streaming, I'd be happy to hear about it.

Thanks in advance.

Upvotes: 6

Views: 3698

Answers (5)

Aastha
Aastha

Reputation: 513

I got this working by creating a wrapper class over the broadcast variable. The updateAndGet method of wrapper class returns the refreshed broadcast variable. I am calling this function inside dStream.transform -> as per the Spark Documentation

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

Transform Operation states: "the supplied function gets called in every batch interval. This allows you to do time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables, etc. can be changed between batches."

BroadcastWrapper class will look like :

public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();

private static BroadcastWrapper obj = new BroadcastWrapper();

private BroadcastWrapper(){}

public static BroadcastWrapper getInstance() {
       return obj;
}

public JavaSparkContext getSparkContext(SparkContext sc) {
      JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
      return jsc;
}

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
       Date currentDate = Calendar.getInstance().getTime();
       long diff = currentDate.getTime()-lastUpdatedAt.getTime();
       if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
           if (var != null)
              var.unpersist();
           lastUpdatedAt = new Date(System.currentTimeMillis());

           //Your logic to refresh
           ReferenceData data = getRefData();

           var = getSparkContext(sparkContext).broadcast(data);
      }
      return var;
}
}

You can use this broadcast variable updateAndGet function in stream.transform method that allows RDD-RDD transformations

objectStream.transform(stream -> {

  Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context());

/**Your code to manipulate stream **/
});

Refer to my full answer from this pos :https://stackoverflow.com/a/41259333/3166245

Hope it helps

Upvotes: 4

mseaspring
mseaspring

Reputation: 21

bkc.unpersist(true)
bkc.destroy() 
bkc = sc.broadcast(tableResultMap) 
bkv = bkc.value

You may try this,I not guarantee whether effective

Upvotes: 2

Joey.Chang
Joey.Chang

Reputation: 164

I got an ugly play, but it worked! We can find how to get a broadcast value from a broadcast object. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L114 just by broadcast id.

so i periodically rebroadcast through the same broadcast id.

val broadcastFactory = new TorrentBroadcastFactory()
broadcastFactory.unbroadcast(BroadcastId, true, true)
// append some ids to initIds
val broadcastcontent = broadcastFactory.newBroadcast[.Set[String]](initIds, false, BroadcastId)

and i can get BroadcastId from the first broadcast value.

val ids = ssc.sparkContext.broadcast(initIds)
// broadcast id
val BroadcastId = broadcastIds.id

then worker use ids as a Broadcast Type as normal.

def func(record: Array[Byte], bc: Broadcast[Set[String]]) = ???

Upvotes: 1

mvogiatzis
mvogiatzis

Reputation: 612

It is best that you collect the data to the driver and then broadcast them to all nodes.

Use Dstream # foreachRDD to collect the computed RDDs at the driver and once you know when you need to change learning rate, then use SparkContext#broadcast(value) to send the new value to all nodes.

I would expect the code to look something like the following:

dStreamContainingBroadcastValue.foreachRDD{ rdd => 
      val valueToBroadcast = rdd.collect()
      sc.broadcast(valueToBroadcast)
}

You may also find this thread useful, from the spark user mailing list. Let me know if that works.

Upvotes: 0

Sujee Maniyam
Sujee Maniyam

Reputation: 1103

My understanding is once a broadcast variable is initially sent out, it is 'read only'. I believe you can update the broadcast variable on the local nodes, but not on remote nodes.

May be you need to consider doing this 'outside Spark'. How about using a noSQL store (Cassandra ..etc) or even Memcache? You can then update the variable from one task and periodically check this store from other tasks?

Upvotes: 2

Related Questions