Nicolas M.
Nicolas M.

Reputation: 1478

pyspark - calculation of partitioned data (created with "append" mode) slow

I have a performance issue on query after partitioning.

I have a daily parquet file of around 30 millions rows and 20 columns. For example, the file data_20210721.parquet looks like:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

We have a code to process it to have only a single day and cut a midnight such that we have:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A         | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

The line 2, can be called a residual because it is not from the same day as the file.

Then we wanted to generate 1 parquet per daytime so the default solution was to process each file and save the dataframe with:

df.write.partitionBy(["id", "daytime"]).mode("append").parquet("hdfs/path")

The mode is set to append because the next day, we may have residuals from past / future days.

There is also other levels of partitioning such as:

Even if partition are quite "balanced" in term of rows, the processing time becames incredibly slow.

For example, to count the number of rows per day for a given set of date:

spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()

We thought that there is too many small partitions at the final level (because of the append, there is around 600 very small files of few Kb/Mb) so we tried to coalesce them for each partition and there is no improvements. We also tried to partition only on daytime (in case having to many level of partition creates issues).

Is there is any solutions to improve the performance (or understand where is the bottleneck) ? Can it be linked to the fact that we are partitioning a date column ? I saw a lot of example with partition by year/month/day for example which are 3 integers but does not fit our need.

This solution was perfect to solve a lot of problems we had but the loss of performance if far too important to be kept as is. Any suggestion is welcome :)

EDIT 1 :

The issues come from the fact the the plan is not the same between:

spark.read.parquet("path/to/data/DayTime=2021-07-10")

and

spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")

Here is the plan for a small example where DayTime has been converted to a "long" as I thought maybe the slowness was due to the datatype:

spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)

== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
      +- Relation[date_from#4297,date_to#4298, ....] parquet

== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp, ts: int, ....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet

== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,date_to#4298,ts#4308, ....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#4297,date_to#4298,ts#4308, ....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf], PartitionCount: 1, PartitionFilters: [isnotnull(ts#4308), (ts#4308 = 20200103)], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....

vs

spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)

== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet

== Analyzed Logical Plan ==
date_from: timestamp, date_to: timestamp,, ....] parquet

== Optimized Logical Plan ==
Relation[date_from#2086,date_to#2087, ....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#2086,date_to#2087, .....] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://.../test_perf/ts=20200103], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<date_from:timestamp,date_to:timestamp, ....

Thanks in advance,

Nicolas

Upvotes: 2

Views: 691

Answers (1)

Sanket9394
Sanket9394

Reputation: 2091

You have to ensure that your filter is actually utilising the partitioned structure, pruning at disk level rather than bringing all data into memory and then applying filter.

Try to check your physical plan

spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )
.explain()

It should have a stage similar to PartitionFilters: [isnotnull(DayTime#123), (DayTime#76 = your condition)],

My guess is in your case, it is not utilising this PartitionFilters and whole data is scanned.

I would suggest to try experimenting your syntax / repartition strategy using a small data set until you achieve PartitionFilters.

Upvotes: 1

Related Questions