Reputation: 165
I am running a streaming job and want to build a lookup map incrementally( track unique items, filter duplicated incomings for example), initially I was thinking to keep one DataFrame
in cache, and union it with new DataFrame
created in each batch, something like this
items.foreachRDD((rdd: RDD[String]) => {
...
val uf = rdd.toDF
cached_df = cached_df.unionAll(uf)
cached_df.cache
cached_df.count // materialize the
...
})
My concern is that the cached_df seems remember all the lineages to previous RDD
s appended from every batch iteration, in my case, if I don't care to recompute this cached RDD
if it crashes, is that an overhead to maintain the growing DAG?
As an alternative, at the beginning of each batch, I load the lookup from parquet file, instead of keeping it in memory, then at the end of each batch I append the new RDD
to the same parquet file:
noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup")
This works as expected, but is there straight forward way that keep the lookup in memory?
Thanks Wanchun
Upvotes: 2
Views: 646
Reputation: 1808
Appending to Parquet is definitely the right approach. However you could optimize the lookup. If you are okay with the in-memory cache to be slightly delayed (that is, does not have the latest second data), then you could periodically (say, every 5 minutes) load the current "lookup" parquet table in memory (assuming it fits). And all lookup queries will lookup the latest 5 min snapshot.
You could also pipeline the loading to memory and serving of queries in different thread.
Upvotes: 1