Jiew Meng
Jiew Meng

Reputation: 88357

Why is my parquet partitioned data slower than non-partitioned one?

My understanding is: If I partition my data on a column I will query by it should be faster. However, when I tried it, it seem to be slower instead why?

I have a users dataframe which I tried partitioning my yearmonth and not.

So I have 1 dataset partitioned by creation_yearmonth.

questionsCleanedDf.repartition("creation_yearmonth") \
    .write.partitionBy('creation_yearmonth') \
    .parquet('wasb://.../parquet/questions.parquet')

I have another not partitioned

questionsCleanedDf \
    .write \
    .parquet('wasb://.../parquet/questions_nopartition.parquet')

Then I tried creating a dataframe from these 2 parquet files and running the same query

questionsDf = spark.read.parquet('wasb://.../parquet/questions.parquet')

and

questionsDf = spark.read.parquet('wasb://.../parquet/questions_nopartition.parquet')

The query

spark.sql("""
    SELECT * FROM questions
    WHERE creation_yearmonth = 201606
""")

It seem like the no partition one is consistently faster or have similar times (~2 - 3s) while partitioned one is slighly slower (~3 - 4s).

I tried to do an explain:

For the partitioned dataset:

== Physical Plan ==
*FileScan parquet [id#6404,title#6405,tags#6406,owner_user_id#6407,accepted_answer_id#6408,view_count#6409,answer_count#6410,comment_count#6411,creation_date#6412,favorite_count#6413,creation_yearmonth#6414] Batched: false, Format: Parquet, Location: InMemoryFileIndex[wasb://[email protected]/parquet/questions.parquet], PartitionCount: 1, PartitionFilters: [isnotnull(creation_yearmonth#6414), (creation_yearmonth#6414 = 201606)], PushedFilters: [], ReadSchema: struct<id:int,title:string,tags:array<string>,owner_user_id:int,accepted_answer_id:int,view_count...

PartitionCount: 1 I should since in this case, it can just go directly to the parition it should be faster?

For the non-paritioned one:

== Physical Plan ==
*Project [id#6440, title#6441, tags#6442, owner_user_id#6443, accepted_answer_id#6444, view_count#6445, answer_count#6446, comment_count#6447, creation_date#6448, favorite_count#6449, creation_yearmonth#6450]
+- *Filter (isnotnull(creation_yearmonth#6450) && (creation_yearmonth#6450 = 201606))
   +- *FileScan parquet [id#6440,title#6441,tags#6442,owner_user_id#6443,accepted_answer_id#6444,view_count#6445,answer_count#6446,comment_count#6447,creation_date#6448,favorite_count#6449,creation_yearmonth#6450] Batched: false, Format: Parquet, Location: InMemoryFileIndex[wasb://[email protected]/parquet/questions_nopartition.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(creation_yearmonth), EqualTo(creation_yearmonth,201606)], ReadSchema: struct<id:int,title:string,tags:array<string>,owner_user_id:int,accepted_answer_id:int,view_count...

Also very surprising. At first the dataset has dates as strings, so I need to do a query like:

spark.sql("""
    SELECT * FROM questions
    WHERE CAST(creation_date AS date) BETWEEN '2017-06-01' AND '2017-07-01'
""").show(20, False)

I expected this to be even slower but it turns out, it performs the best ~1-2s. Why is that? I thought in this case, it needs to cast each row?

The explain output here:

== Physical Plan ==
*Project [id#6521, title#6522, tags#6523, owner_user_id#6524, accepted_answer_id#6525, view_count#6526, answer_count#6527, comment_count#6528, creation_date#6529, favorite_count#6530]
+- *Filter ((isnotnull(creation_date#6529) && (cast(cast(creation_date#6529 as date) as string) >= 2017-06-01)) && (cast(cast(creation_date#6529 as date) as string) <= 2017-07-01))
   +- *FileScan parquet [id#6521,title#6522,tags#6523,owner_user_id#6524,accepted_answer_id#6525,view_count#6526,answer_count#6527,comment_count#6528,creation_date#6529,favorite_count#6530] Batched: false, Format: Parquet, Location: InMemoryFileIndex[wasb://[email protected]/filtered/questions.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(creation_date)], ReadSchema: struct<id:string,title:string,tags:array<string>,owner_user_id:string,accepted_answer_id:string,v...

Upvotes: 3

Views: 2457

Answers (1)

Zoltan
Zoltan

Reputation: 3115

Overpartitioning can actually reduce performance:

If a column has only a few rows matching each value, the number of directories to process can become a limiting factor, and the data file in each directory could be too small to take advantage of the Hadoop mechanism for transmitting data in multi-megabyte blocks.

This excerpt was taken from the documentation of a different Hadoop component, Impala, but the presented argument should be valid to all components of the Hadoop stack.

I think that regardless of the partitioning scheme used, the advantages of partitioning will not be apparent until the table grows way beyond 900 MB-s.

Upvotes: 2

Related Questions