Reputation: 2514
I used to use df.repartition(1200).write.parquet(...)
which created 1200 number of files as specified in the repartion argument. I am now using paritionBy
, i.e. df.repartition(1200).write.partitionBy("mykey").parquet(...)
. This works fine, except that it is now creating 1200 files per bucket of mykey
. I would like to have 1200 files over all.
Other posts suggest to repartition across certain keys. The relevant documentation for my spark version (2.4.0
) seems to suggest that this feature was added later. Is there any other way to achieve it? I guess I could repartition to 1200/len(unique("mykey")
. But that's a bit hacky. Is there a better way to do it? I am also worrying that reducing the number of partitions results in out of memory erros.
Upvotes: 0
Views: 3539
Reputation: 2514
To me it seems the best way to handle it was to order by mykey
. This way, the right data is already in the respective partitions such that partitionBy('mykey')
does not create a too many partitions (but broadly as many files as num partitions).
Upvotes: 0
Reputation: 1444
Calling partitionBy on your writer does not change the partitioning scheme of your dataframe. Instead it is used to specify the partitioning scheme of your data once it is written to disk
Say you have a dataframe with 200 parititons and you call df.write.partitionBy("mykey").parquet(...)
Each of your partitions will bucket it's data by unique values of "mykey"
Each bucket in each partition will correspond to one file being written to a disk partition
For example lets say the dataframe has 200 values of the field mykey=KEY1
And lets say that each of these 200 values are evenly spread accross the 200 partitions with 1 per partition
then when we call df.write.partitionBy("mykey").parquet(...)
We will get 200 files in the disk partition mykey=KEY1. One from each partition
To answer your question, there are a few ways of ensuring that exactly 1200 files are written to disk. All methods require precise control of the number of unique values in your parititons
# requires mykey to have exactly 1200 unique values
df = df.repartition("mykey")
df.write.partitionBy("mykey").parquet(...)
# requires mykey to have exactly 1200 unique values
df = df.coalesce(1)
df.write.partitionBy("mykey").parquet(...)
# requires mykey to have exactly 1 unique value
df = df.repartition(1200)
df.write.partitionBy("mykey").parquet(...)
Upvotes: 2
Reputation: 645
I'm not quite sure why you want to do both repartition
and partitionBy
, but you could do
df = df.repartition(1200)
df = your_processing(df)
df.coalesce(1).write.partitionBy("mykey").parquet(...)
coalesce(1)
merges the partition into a single one that is then split up again by the partitionBy
.
Upvotes: 0