thentangler
thentangler

Reputation: 1256

partitioning and re-partittioning parquet files using pyspark

I have a parquet partitioning issue that I am trying to solve. I have read a lot of material on partitioning in this site and on the web but still couldn't explain my problem.

Step 1: I have a large dataset (~2TB) that has MODULE and DATE columns and is partitioned by DATE consisting of 86 days. Each DATE partition has 21 files and therefore has a total of 86 * 21 = 1806 files.

Step 2: I needed to aggregate the data based on the MODULE column so I loaded saved it as another parquet partitioning it by MODULE. There are 9 modules and each module has data from all of the 86 days and so the resulting parquet had 9 * 1806 = 16254 files.

Step 3 I loaded each MODULE partition through a for loop, performed my aggregations, and saved it back as a folder in append mode such that I have the 9 modules as folders: s3://path/MODULE A/, s3://path/MODULE B etc. They are not partitioned by module but just saved as folders. Since my default spark numpartitions was 201, each module folder had 201 files and so a total of 9 * 201 = 1809 files

Step 4 So far so good but I needed to partition it back by DATE. So I looped through each MODULE partition and saved the file as a single parquet file without any partitions. That resulted in a total of 2751 files. I don't know how this is calculated.

Step 5 I then loaded the entire unpartitioned and saved it partitioning by DATE. This resulted in about 39k files and each file is about 1.5MB. So I have a huge amount of small files and it takes an immensely long time to load the parquet or do any kind of operations on them such as groupBy etc.

After doing some more reading, I tried using repartition(1).partitionBy('DATE') in step 4 to reduce the number of files, but it failed towards the end. I know I am doing something wrong from step 4 onwards. Is there a more efficient way to do the whole thing?

Thank You

Upvotes: 1

Views: 3010

Answers (1)

Shubham Jain
Shubham Jain

Reputation: 5536

Finding correct number of partitions is your concern then.

Suppose you have 86 days data and you want to save it partitioned by date. Then you should be aware how many files you want to create under one partition.

Suppose you have 3 GB data for each date, then probably you want atleast 6 files in each date folder.

You can achieve this as

df.repartition(6,'date').write.partitionBy('date')...

Now if you want to restrict number of records in each file then use the property

df.repartition(6, 'date').write.option("maxRecordsPerFile", 10000).partitionBy('date')...

Upvotes: 1

Related Questions