Oleg
Oleg

Reputation: 191

Is caching necessary for dataframe which is reused before the first Action?

I have a dataframe, which I independently transform in different ways before joining result in the final DF. Intermediate transformed dataframes are never used in any "Actions". The first action is ever called only after all parts are joined together. My question is - should I cache the first dataframe then? Example:

arpu_df=get_arpu_df(..). #.cache() will help here?
sample_by_arpu_ranges=arpu_df.filter("arpu>50").sample(False,0.4)\
    .union(
        arpu_df.filter("arpu>20 and arpu<=50").sample(False,0.1)
        )\
    .union(
        arpu_df.filter("arpu<=20").sample(False,0.02)
        ).select("base_subsc_id")
sample_by_arpu_ranges.count()

sample is transformation, as far as I know. I wonder whether the arpu_df part will be recomputed to apply each of the filters, or the logical plan builder will understand that it can reuse it in the different parts of the plan?

Upvotes: 1

Views: 2404

Answers (2)

abiratsis
abiratsis

Reputation: 7316

Cache will get triggered only after calling an action hence in your case the answer is no the cache will not be beneficial before calling sample_by_arpu_ranges.count(). A common work around is to call the less expensive action which is count() just after cache(), then your code would look as the next one:

arpu_df=get_arpu_df(..)

arpu_df.cache()
arpu_df.count()

sample_by_arpu_ranges=arpu_df.filter("arpu>50").sample(False,0.4)\
    .union(
        arpu_df.filter("arpu>20 and arpu<=50").sample(False,0.1)
        )\
    .union(
        arpu_df.filter("arpu<=20").sample(False,0.02)
        ).select("base_subsc_id")
sample_by_arpu_ranges.count()

Upvotes: 1

SimbaPK
SimbaPK

Reputation: 596

The answer is inside your question. You only have one action, so all your transformations will be done at that time. In that case you don't need to persist (or cache) your dataframe.

Persist is usefull only if you will need to compute again the transformations

example :

arpu_df=get_arpu_df(..)
sample_by_arpu_ranges=arpu_df.filter("arpu>50").sample(False,0.4)\
    .union(
        arpu_df.filter("arpu>20 and arpu<=50").sample(False,0.1)
        )\
    .union(
        arpu_df.filter("arpu<=20").sample(False,0.02)
        ).select("base_subsc_id").persist() //here you persist sample_by_arpu_ranges because you know you will have multiple actions on it

sample_by_arpu_ranges.count() // 1st action

sample_by_arpu_ranges.write.parquet("path") // 2nd action  

In the example, sample_by_arpu_ranges will be persist during 1st action, thus for the 2nd action, sample_by_arpu_ranges will be ready.

-> Without persist with one action :

arpu_df = spark.read.parquet(path) 
sample_by_arpu_ranges=arpu_df.filter(...)
sample_by_arpu_ranges.count()

what's happening :

  • sample_by_arpu_ranges.count()
  • arpu_df = spark.read.parquet(path)
  • sample_by_arpu_ranges=arpu_df.filter(...)
  • count

--> does not keep arpu_df or sample_by_arpu_ranges but you don't need it anymore

-> Without persist with multiple actions :

arpu_df = spark.read.parquet(path) 
sample_by_arpu_ranges=arpu_df.filter(...)
arpu_df.count()
sample_by_arpu_ranges.count()

what's happening :

  • arpu_df.count()
  • arpu_df = spark.read.parquet(path)
  • count

--> does not keep the arpu_df !

  • sample_by_arpu_ranges.count()

  • arpu_df = spark.read.parquet(path) // you have to read again !

  • sample_by_arpu_ranges=arpu_df.filter(...)

  • count

-> With persist with multiple actions :

arpu_df = spark.read.parquet(path).persist
sample_by_arpu_ranges=arpu_df.filter(...)
arpu_df.count()
sample_by_arpu_ranges.count()

what's happening :

  • arpu_df.count()
  • arpu_df = spark.read.parquet(path)
  • persist ---> save arpu_df in cache
  • count
  • sample_by_arpu_ranges.count()
  • sample_by_arpu_ranges=arpu_df(take it from cache, don't need to read it).filter(...)
  • count

Upvotes: 0

Related Questions