Wanchun
Wanchun

Reputation: 165

How to lazily build a cache from spark streaming data

I am running a streaming job and want to build a lookup map incrementally( track unique items, filter duplicated incomings for example), initially I was thinking to keep one DataFrame in cache, and union it with new DataFrame created in each batch, something like this

 items.foreachRDD((rdd: RDD[String]) => { 
     ...   
        val uf =  rdd.toDF 
        cached_df = cached_df.unionAll(uf) 
        cached_df.cache 
        cached_df.count   // materialize the 
     ... 
    }) 

My concern is that the cached_df seems remember all the lineages to previous RDDs appended from every batch iteration, in my case, if I don't care to recompute this cached RDD if it crashes, is that an overhead to maintain the growing DAG?

As an alternative, at the beginning of each batch, I load the lookup from parquet file, instead of keeping it in memory, then at the end of each batch I append the new RDD to the same parquet file:

 noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup")

This works as expected, but is there straight forward way that keep the lookup in memory?

Thanks Wanchun

Upvotes: 2

Views: 646

Answers (1)

Tathagata Das
Tathagata Das

Reputation: 1808

Appending to Parquet is definitely the right approach. However you could optimize the lookup. If you are okay with the in-memory cache to be slightly delayed (that is, does not have the latest second data), then you could periodically (say, every 5 minutes) load the current "lookup" parquet table in memory (assuming it fits). And all lookup queries will lookup the latest 5 min snapshot.

You could also pipeline the loading to memory and serving of queries in different thread.

Upvotes: 1

Related Questions