m1nkeh
m1nkeh

Reputation: 1397

databricks partitioning w/ relation to predicate pushdown

I have searched a lot for a succinct answer, hopefully someone can help me with some clarity on databricks partitioning..

assume i have a data frame with columns: Year, Month, Day, SalesAmount, StoreNumber

I want to store this partitioned by Year, & Month.. so i can run the following command:

df.write.partitionBy('Year', 'Month').format('csv').save('/mnt/path/', header='true')

This will output data in the format of: /path/Year=2019/Month=05/<file-0000x>.csv

If i then load it back again, such as:

spark.read.format('csv').options(header='true').load('/mnt/path/').createOrReplaceTempView("temp1")

Q1: This has not yet actually 'read' the data yet, right? i.e. i could have billions of records.. but until i actually query temp1, nothing is executed against the source?

Q2-A: Subsequently, when querying this data using temp1, it is my assumption that if i include the items that were used in the partitioning in the where clause, a smart filtering on the actual files that are read off the disk will be applied?

%sql
select * from temp1 where Year = 2019 and Month = 05 -- OPTIMAL

whereas the following would not do any file filtering as it has no context of which partitions to look in:

%sql
select * from temp1 where StoreNum = 152 and SalesAmount > 10000 -- SUB-OPTIMAL

Q2-B: Finally, if i stored the files in parquet format (rather than *.csv).. would both of the queries above 'push down' in to the actual data stored.. but in perhaps different ways?

I.e. the first would still use the partitions, but the second (where StoreNum = 152 and SalesAmount > 10000) will now use columnar storage of parquet? While *.csv does not have that optimisation?

Can anyone please clarify my thinking / understanding around this?

links to resources would be great also..

Upvotes: 4

Views: 1922

Answers (2)

abiratsis
abiratsis

Reputation: 7316

A1: You are right about the evaluation of createOrReplaceTempView. This will be evaluated lazily for the current Spark session. In other word if you terminate Spark session without accessing it the data will never be transfered into temp1.

A2: Let's examine the case through an example using your code. First let's save your data with:

df.write.mode("overwrite").option("header", "true")
  .partitionBy("Year", "Month")
  .format("csv")
  .save("/tmp/partition_test1/")

And then load it with:

val df1 = spark.read.option("header", "true")
                .csv("/tmp/partition_test1/")
                .where($"Year" === 2019 && $"Month" === 5)

Executing df1.explain will return:

== Physical Plan ==
*(1) FileScan csv [Day#328,SalesAmount#329,StoreNumber#330,Year#331,Month#332] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 0, Partition
Filters: [isnotnull(Year#331), isnotnull(Month#332), (Year#331 = 2019), (Month#332 = 5)], PushedFilters: [], ReadSchema: struct<Day:string,SalesAmount:string,StoreNumber:string>

As you can see the PushedFilters: [] array is empty although the PartitionFilters[] is not, indicating that Spark was able to apply filtering on partitions and therefore pruning the partitions that do not satisfy the where statement.

If we slightly change the Spark query to:

df1.where($"StoreNumber" === 1 && $"Year" === 2011 && $"Month" === 11).explain

== Physical Plan ==
*(1) Project [Day#462, SalesAmount#463, StoreNumber#464, Year#465, Month#466]
+- *(1) Filter (isnotnull(StoreNumber#464) && (cast(StoreNumber#464 as int) = 1))
   +- *(1) FileScan csv [Day#462,SalesAmount#463,StoreNumber#464,Year#465,Month#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 1, Par
titionFilters: [isnotnull(Month#466), isnotnull(Year#465), (Year#465 = 2011), (Month#466 = 11)], PushedFilters: [IsNotNull(StoreNumber)], ReadSchema: struct<Day:string,SalesAmount:string,Store
Number:string>

Now both PartitionFilters and PushedFilters will take place minimizing Spark workload. As you can see Spark leverages both filters first by recognizing the existing partitions through PartitionFilters and then applying the predicate pushdown.

Exactly the same applies for parquet files with the big difference that parquet will utilize the predicate pushdown filters even more combining them with its internal columnar based system (as you already mentioned), which keeps metrics and stats over the data. So the difference with CSV files is that in the case of CSVs the predicate pushdown will take place when Spark is reading/scanning the CSV files excluding records that do not satisfy the predicate pushdown condition. When for parquet the predicate pushdown filter will be propagated to the parquet internal system resulting to even larger pruning of data.

In your case loading data from createOrReplaceTempView will not differ and the execution plan will remain the same.

Some useful links:

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

https://www.waitingforcode.com/apache-spark-sql/predicate-pushdown-spark-sql/read

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkStrategy-FileSourceStrategy.html

Upvotes: 4

simon_dmorias
simon_dmorias

Reputation: 2473

Q1, when you read csv files without providing a schema then it has to infer the schema and a read happens immediately of all files (possibly it filter the partition at this point if it can). If you were to provide a schema your assumptions on filtering are correct as are the execution event assumptions.

Q2. Not sure I follow. When you say two queries do you mean above or below? On the below one does a write, how do you expect filtering to work on a write?

If you are referring to the first two queries in parquet then the first will eliminate most files and be very quick. The second will hopefully skip some data by using statistics on in the files to show that it doesn’t need to read them. But it will still touch every file.

You may find this useful https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

Upvotes: 1

Related Questions