Reputation: 1478
I have a performance issue on query after partitioning.
I have a daily parquet file of around 30 millions rows and 20 columns. For example, the file data_20210721.parquet
looks like:
+-----------+---------------------+---------------------+------------+-----+
| reference | date_from | date_to | daytime | ... |
+-----------+---------------------+---------------------+------------+-----+
| A | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A | ... | ... | ... | ... |
+-----------+---------------------+---------------------+------------+-----+
We have a code to process it to have only a single day and cut a midnight such that we have:
+-----------+---------------------+---------------------+------------+-----+
| reference | date_from | date_to | daytime | ... |
+-----------+---------------------+---------------------+------------+-----+
| A | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A | ... | ... | ... | ... |
+-----------+---------------------+---------------------+------------+-----+
The line 2, can be called a residual because it is not from the same day as the file.
Then we wanted to generate 1 parquet per daytime so the default solution was to process each file and save the dataframe with:
df.write.partitionBy(["id", "daytime"]).mode("append").parquet("hdfs/path")
The mode is set to append because the next day, we may have residuals from past / future days.
There is also other levels of partitioning such as:
Even if partition are quite "balanced" in term of rows, the processing time becames incredibly slow.
For example, to count the number of rows per day for a given set of date:
spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()
We thought that there is too many small partitions at the final level (because of the append, there is around 600 very small files of few Kb/Mb) so we tried to coalesce them for each partition and there is no improvements. We also tried to partition only on daytime
(in case having to many level of partition creates issues).
Is there is any solutions to improve the performance (or understand where is the bottleneck) ?
Can it be linked to the fact that we are partitioning a date
column ? I saw a lot of example with partition by year/month/day for example which are 3 integers but does not fit our need.
This solution was perfect to solve a lot of problems we had but the loss of performance if far too important to be kept as is. Any suggestion is welcome :)
The issues come from the fact the the plan is not the same between:
spark.read.parquet("path/to/data/DayTime=2021-07-10")
and
spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")
Here is the plan for a small example where DayTime
has been converted to a "long" as I thought maybe the slowness was due to the datatype:
spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)
== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
+- Relation[date_from#4297,date_to#4298, ....] parquet
== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp, ts: int, ....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet
== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet
== Physical Plan ==
*(1) FileScan parquet [date_from#4297,date_to#4298,ts#4308, ....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf], PartitionCount: 1, PartitionFilters: [isnotnull(ts#4308), (ts#4308 = 20200103)], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....
vs
spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)
== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet
== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp,, ....] parquet
== Optimized Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet
== Physical Plan ==
*(1) FileScan parquet [date_from#2086,date_to#2087, .....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf/ts=20200103], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....
Thanks in advance,
Nicolas
Upvotes: 2
Views: 691
Reputation: 2091
You have to ensure that your filter
is actually utilising the partitioned structure, pruning at disk level rather than bringing all data into memory and then applying filter.
Try to check your physical plan
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )
.explain()
It should have a stage similar to PartitionFilters: [isnotnull(DayTime#123), (DayTime#76 = your condition)],
My guess is in your case, it is not utilising this PartitionFilters
and whole data is scanned.
I would suggest to try experimenting your syntax / repartition strategy using a small data set until you achieve PartitionFilters
.
Upvotes: 1