Reputation: 191
I have a dataframe, which I independently transform in different ways before joining result in the final DF. Intermediate transformed dataframes are never used in any "Actions". The first action is ever called only after all parts are joined together. My question is - should I cache the first dataframe then? Example:
arpu_df=get_arpu_df(..). #.cache() will help here?
sample_by_arpu_ranges=arpu_df.filter("arpu>50").sample(False,0.4)\
.union(
arpu_df.filter("arpu>20 and arpu<=50").sample(False,0.1)
)\
.union(
arpu_df.filter("arpu<=20").sample(False,0.02)
).select("base_subsc_id")
sample_by_arpu_ranges.count()
sample
is transformation, as far as I know.
I wonder whether the arpu_df
part will be recomputed to apply each of the filters, or the logical plan builder will understand that it can reuse it in the different parts of the plan?
Upvotes: 1
Views: 2404
Reputation: 7316
Cache will get triggered only after calling an action hence in your case the answer is no the cache
will not be beneficial before calling sample_by_arpu_ranges.count()
. A common work around is to call the less expensive action which is count()
just after cache()
, then your code would look as the next one:
arpu_df=get_arpu_df(..)
arpu_df.cache()
arpu_df.count()
sample_by_arpu_ranges=arpu_df.filter("arpu>50").sample(False,0.4)\
.union(
arpu_df.filter("arpu>20 and arpu<=50").sample(False,0.1)
)\
.union(
arpu_df.filter("arpu<=20").sample(False,0.02)
).select("base_subsc_id")
sample_by_arpu_ranges.count()
Upvotes: 1
Reputation: 596
The answer is inside your question. You only have one action, so all your transformations will be done at that time. In that case you don't need to persist (or cache) your dataframe.
Persist is usefull only if you will need to compute again the transformations
example :
arpu_df=get_arpu_df(..)
sample_by_arpu_ranges=arpu_df.filter("arpu>50").sample(False,0.4)\
.union(
arpu_df.filter("arpu>20 and arpu<=50").sample(False,0.1)
)\
.union(
arpu_df.filter("arpu<=20").sample(False,0.02)
).select("base_subsc_id").persist() //here you persist sample_by_arpu_ranges because you know you will have multiple actions on it
sample_by_arpu_ranges.count() // 1st action
sample_by_arpu_ranges.write.parquet("path") // 2nd action
In the example, sample_by_arpu_ranges will be persist during 1st action, thus for the 2nd action, sample_by_arpu_ranges will be ready.
-> Without persist with one action :
arpu_df = spark.read.parquet(path)
sample_by_arpu_ranges=arpu_df.filter(...)
sample_by_arpu_ranges.count()
what's happening :
--> does not keep arpu_df or sample_by_arpu_ranges but you don't need it anymore
-> Without persist with multiple actions :
arpu_df = spark.read.parquet(path)
sample_by_arpu_ranges=arpu_df.filter(...)
arpu_df.count()
sample_by_arpu_ranges.count()
what's happening :
--> does not keep the arpu_df !
sample_by_arpu_ranges.count()
arpu_df = spark.read.parquet(path) // you have to read again !
sample_by_arpu_ranges=arpu_df.filter(...)
count
-> With persist with multiple actions :
arpu_df = spark.read.parquet(path).persist
sample_by_arpu_ranges=arpu_df.filter(...)
arpu_df.count()
sample_by_arpu_ranges.count()
what's happening :
Upvotes: 0