Ruchira Kariyawasam
Ruchira Kariyawasam

Reputation: 440

Performance of Spark DataFrame with spark sql

I want to use the spark DataFrame effectively by reducing the db cycle and memory.

Here I have provided the sample code. (Not the full implementation)

Map<String, String> options = new HashMap<>();
options.put("dbtable", ("select * from TestTable");

//Create the dataframe
DataFrame df1 = sqlContext.read().format("jdbc").options(options).load();
df1.registerTempTable("TestDBFrame");

//Query1
DataFrame df2 =sqlContext.sql("SELECT name FROM TestDBFrame WHERE age >= 10");

//Query2
DataFrame df3 =sqlContext.sql("SELECT name FROM TestDBFrame WHERE age >= 50");

//df2 operation 
df2.count

//df3 operation 
df3.count
  1. When running query1 and query2, how many time hit to the DB ? Is it hit two times to the DB ?

  2. When we accessing count of df2 and df3 dataframes, based on originally created dataframe, is it execute DB another two times or simply load from memory?

Since I need to optimize the DB cycle and memory, would like to get better explanation on this.

Upvotes: 1

Views: 878

Answers (1)

zero323
zero323

Reputation: 330383

When running query1 and query2, how many time hit to the DB ? Is it hit two times to the DB ?

Zero times. Since none of the above triggers an action there is no need to access the database beyond initial metadata fetch when you call load

When we accessing count of df2 and df3 dataframes, based on originally created dataframe, is it execute DB another two times or simply load from memory?

Each action on a DataFrame will access database. If you want to minimize database hits you should consider caching the tables. It won't prevent database access but should minimize unnecessary traffic

You have to remember though that it doesn't provide strong guarantees. Spark SQL includes multiple optimizations when working with external sources. For example df2.count and df3.count will pushdown predicates so there can be no data suitable for caching. You can try to isolate downstream DataFrames as follows:

DataFrame df1 = sqlContext.read().format("jdbc")
  .options(options).load().where(lit(true))
df1.registerTempTable("TestDBFrame");
sqlContext.cacheTable("TestDBFrame");

It should fetch and cache if there is enough memory a complete table on the first access. Just keep in mind that in practice it can be less efficient than letting predicate pushdown do its work.

If you want strong guarantees you should export data from a database before reading it in Spark.

On a side note it looks like subquery you use is missing an alias.

Upvotes: 1

Related Questions