Reputation: 2417
In my Spark streaming application, I want to map a value based on a dictionary that's retrieved from a backend (ElasticSearch). I want to periodically refresh the dictionary periodically, in case it was updated in the backend. It would be similar to Logstash translate filter's periodic refresh capability. How could I achieve this with Spark (e.g. somehow unpersist the RDD every 30 seconds)?
Upvotes: 8
Views: 4267
Reputation: 351
Steps :
Optional: Hive table repair via spark can be added to init.
spark.sql("msck repair table tableName")
import java.time.LocalDateTime
var caching_data = Data.init()
caching_data.persist()
var currTime = LocalDateTime.now()
var cacheClearTime = currTime.plusMinutes(30) // Put your time in Units
DStream.foreachRDD(rdd => if (rdd.take(1).length > 0) {
//Clear and Cache again
currTime = LocalDateTime.now()
val dateDiff = cacheClearTime.isBefore(currTime)
if (dateDiff) {
caching_data.unpersist(true) //blocking unpersist on boolean = true
caching_data = Data.init()
caching_data.persist()
currTime = LocalDateTime.now()
cacheClearTime = currTime.plusMinutes(30)
}
})
Upvotes: 0
Reputation: 37435
The best way I've found to do that is to recreate the RDD and maintain a mutable reference to it. Spark Streaming is at its core an scheduling framework on top of Spark. We can piggy-back on the scheduler to have the RDD refreshed periodically. For that, we use an empty DStream that we schedule only for the refresh operation:
def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream
// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))
var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}
val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
In the past, I've also tried only with interleaving cache()
and unpersist()
with no result (it refreshes only once). Recreating the RDD removes all lineage and provides a clean load of the new data.
Upvotes: 12