lairtech
lairtech

Reputation: 2417

Spark Streaming: How to periodically refresh cached RDD?

In my Spark streaming application, I want to map a value based on a dictionary that's retrieved from a backend (ElasticSearch). I want to periodically refresh the dictionary periodically, in case it was updated in the backend. It would be similar to Logstash translate filter's periodic refresh capability. How could I achieve this with Spark (e.g. somehow unpersist the RDD every 30 seconds)?

Upvotes: 8

Views: 4267

Answers (2)

Vinayak Mishra
Vinayak Mishra

Reputation: 351

Steps :

  1. Cache once before Starting streaming
  2. Clear cache after a certain period (example here for 30 minutes)

Optional: Hive table repair via spark can be added to init.

spark.sql("msck repair table tableName")

import java.time.LocalDateTime

var caching_data = Data.init()

caching_data.persist()

var currTime = LocalDateTime.now()

var cacheClearTime = currTime.plusMinutes(30) // Put your time in Units

DStream.foreachRDD(rdd => if (rdd.take(1).length > 0) {
  //Clear and Cache again
  currTime = LocalDateTime.now()
  val dateDiff = cacheClearTime.isBefore(currTime)
  if (dateDiff) {
    caching_data.unpersist(true) //blocking unpersist on boolean = true
    caching_data = Data.init()
    caching_data.persist()
    currTime = LocalDateTime.now()
    cacheClearTime = currTime.plusMinutes(30)
  }
})

Upvotes: 0

maasg
maasg

Reputation: 37435

The best way I've found to do that is to recreate the RDD and maintain a mutable reference to it. Spark Streaming is at its core an scheduling framework on top of Spark. We can piggy-back on the scheduler to have the RDD refreshed periodically. For that, we use an empty DStream that we schedule only for the refresh operation:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream

// a dstream of empty data
val refreshDstream = new  ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))

var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ => 
    // evict the old RDD from memory and recreate it
    referenceData.unpersist(true)
    referenceData = getData()
    referenceData.cache()
}

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...

In the past, I've also tried only with interleaving cache() and unpersist() with no result (it refreshes only once). Recreating the RDD removes all lineage and provides a clean load of the new data.

Upvotes: 12

Related Questions