Reputation: 3294
I have a Spark
program that starts to create the network of France (cities, local authorities...) in a dataset for a given year. It is then used for other operations : local accounting, searching among enterprises, etc.
The dataset is in term of business rules rather hard to create : many filters, checking of many kinds, and I don't know in advance how the caller who ask for it will use it. But most of the times, he asks for a dataset for the year 2019, because he only needs "All the cities existing in France today.".
My program below succeed in returning results for 2019.
Next caller also calls for cities of 2019 : Spark
restarts against the whole work he did before...
What is the principle of optimization here ?
Shall I store in my program, at the same level where I store the spark session I use for requests and building, something like a Map<Integer, Dataset>
where the key is the year, and the dataset the one that at least one caller has asked for this year ?
Upvotes: 3
Views: 424
Reputation: 18003
Assuming a spark-shell or spark-compiled program that runs in same Session picking up requests:
- Use IGNITE, or
- Rely on 'skipped stages' effect (using .cache for DFs as well).
Latter, by example, against RDDs but DF have these underlying:
val d = sc.parallelize(0 until 100000).map(i => (i%10000, i)).cache // or not cached, does not matter for RDD, for DF, DS it does
val c=d.rightOuterJoin(d.reduceByKey(_+_))
val f=d.leftOuterJoin(d.reduceByKey(_+_))
c.count
c.collect // skipped, shuffled
f.count
f.collect // skipped, shuffled
val g = f.filter(e => e._1%2==0)
val h = f.filter(e => e._1==657)
val j = f.filter(e => e._1==1657)
g.collect
h.collect
j.collect // these skipped as well
Trivial example, but you see Spark shuffling meaning some aspects need not be done again, but it depends on your use cases and how you read the data initially is my take.
Note skipped stages from Spark UI, thus not always as bad as one may think. In some cases your "caching" is achieved this way.
For actions that need different processing, then at least the underlying (intermediate) sources need .cache or .persist.
If new spark-submit used:
- use IGNITE, or
- Re-use checkpoint directory although very convoluted, see Spark Checkpointing Non-Streaming - Checkpoint files can be used in subsequent job run or driver program, although convoluted, and only really applicable if multiple Actions possible on that shuffled RDD that is pre-read, else effect not so great. Or
- Use a good initial query and do a bucketBy save and re-read. See https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4861715144695760/2994977456373837/5701837197372837/latest.html. Particularly handy in case of sorting.
Upvotes: 0
Reputation: 56
You would have to save the dataset to hdfs or any other store being used and load it whenever required instead of recomputing the entire dataset again . This is more about how you would design your application . Probably these datasets should be precomputed for certain recent years as part of data preparation and be ready to use always. This is assuming the next time when it runs it is triggered as a new job ex: job running once a day
Upvotes: 1
Reputation: 108
Redis is the best choice to use with spark. Store the results into the Redis and for the next request just take from Redis.
Upvotes: 0