Marc Le Bihan
Marc Le Bihan

Reputation: 3294

What is the best way to "cache" Spark datasets between requests?

I have a Spark program that starts to create the network of France (cities, local authorities...) in a dataset for a given year. It is then used for other operations : local accounting, searching among enterprises, etc.

The dataset is in term of business rules rather hard to create : many filters, checking of many kinds, and I don't know in advance how the caller who ask for it will use it. But most of the times, he asks for a dataset for the year 2019, because he only needs "All the cities existing in France today.".

My program below succeed in returning results for 2019. Next caller also calls for cities of 2019 : Spark restarts against the whole work he did before...

What is the principle of optimization here ?

Shall I store in my program, at the same level where I store the spark session I use for requests and building, something like a Map<Integer, Dataset> where the key is the year, and the dataset the one that at least one caller has asked for this year ?

Upvotes: 3

Views: 424

Answers (3)

Ged
Ged

Reputation: 18003

Assuming a spark-shell or spark-compiled program that runs in same Session picking up requests:

  1. Use IGNITE, or
  2. Rely on 'skipped stages' effect (using .cache for DFs as well).

Latter, by example, against RDDs but DF have these underlying:

val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter for RDD, for DF, DS it does

val c=d.rightOuterJoin(d.reduceByKey(_+_))
val f=d.leftOuterJoin(d.reduceByKey(_+_))

c.count
c.collect // skipped, shuffled 
f.count
f.collect // skipped, shuffled

val g = f.filter(e => e._1%2==0) 
val h = f.filter(e => e._1==657)
val j = f.filter(e => e._1==1657)

g.collect 
h.collect 
j.collect  // these skipped as well

Trivial example, but you see Spark shuffling meaning some aspects need not be done again, but it depends on your use cases and how you read the data initially is my take.

Note skipped stages from Spark UI, thus not always as bad as one may think. In some cases your "caching" is achieved this way.

For actions that need different processing, then at least the underlying (intermediate) sources need .cache or .persist.

enter image description here

If new spark-submit used:

  1. use IGNITE, or
  2. Re-use checkpoint directory although very convoluted, see Spark Checkpointing Non-Streaming - Checkpoint files can be used in subsequent job run or driver program, although convoluted, and only really applicable if multiple Actions possible on that shuffled RDD that is pre-read, else effect not so great. Or
  3. Use a good initial query and do a bucketBy save and re-read. See https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4861715144695760/2994977456373837/5701837197372837/latest.html. Particularly handy in case of sorting.

Upvotes: 0

Shridhar
Shridhar

Reputation: 56

You would have to save the dataset to hdfs or any other store being used and load it whenever required instead of recomputing the entire dataset again . This is more about how you would design your application . Probably these datasets should be precomputed for certain recent years as part of data preparation and be ready to use always. This is assuming the next time when it runs it is triggered as a new job ex: job running once a day

Upvotes: 1

Sandhya
Sandhya

Reputation: 108

Redis is the best choice to use with spark. Store the results into the Redis and for the next request just take from Redis.

Upvotes: 0

Related Questions