gfytd
gfytd

Reputation: 1809

Why Spark dataframe cache doesn't work here

I just wrote a toy class to test Spark dataframe (actually Dataset since I'm using Java).

Dataset<Row> ds = spark.sql("select id,name,gender from test2.dummy where dt='2018-12-12'");
ds = ds.withColumn("dt", lit("2018-12-17"));
ds.cache();
ds.write().mode(SaveMode.Append).insertInto("test2.dummy");
//
System.out.println(ds.count());

According to my understanding, there're 2 actions, "insertInto" and "count".

I debug the code step by step, when running "insertInto", I see several lines of:

19/01/21 20:14:56 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]

When running "count", I still see similar logs:

19/01/21 20:15:26 INFO FileScanRDD: Reading File path: hdfs://ip:9000/root/hive/warehouse/test2.db/dummy/dt=2018-12-12/000000_0, range: 0-451, partition values: [2018-12-12]

I have 2 questions:

1) When there're 2 actions on same dataframe like above, if I don't call ds.cache or ds.persist explicitly, will the 2nd action always causes the re-executing of the sql query?

2) If I understand the log correctly, both actions trigger hdfs file reading, does that mean the ds.cache() actually doesn't work here? If so, why it doesn't work here?

Many thanks.

Upvotes: 7

Views: 5086

Answers (2)

fuzzycuffs
fuzzycuffs

Reputation: 139

I found that the other answer doesn't work. What I had to do was break lineage such that the df I was writing does not know that one of its source is the table I am writing to. To break lineage, I created a copy df using

copy_of_df = sql_context.createDataframe(df.rdd)

Upvotes: 0

Raphael Roth
Raphael Roth

Reputation: 27383

It's because you append into the table where ds is created from, so ds needs to be recomputed because the underlying data changed. In such cases, spark invalidates the cache. If you read e.g. this Jira (https://issues.apache.org/jira/browse/SPARK-24596):

When invalidating a cache, we invalid other caches dependent on this cache to ensure cached data is up to date. For example, when the underlying table has been modified or the table has been dropped itself, all caches that use this table should be invalidated or refreshed.

Try to run the ds.count before inserting into the table.

Upvotes: 6

Related Questions