thentangler
thentangler

Reputation: 1256

optimizing reading from partitioned parquet files in s3 bucket

I have a large dataset in parquet format (~1TB in size) that is partitioned into 2 hierarchies: CLASS and DATE There are only 7 classes. But the Date is ever increasing from 2020-01-01 onwards. My data is partitioned by CLASS first and then DATE

So something like:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

I load my data by CLASS in a for-loop. If I load the entire parquet file, YARN kills the job since it overloads the memory instances. But I load all the days since I am doing a percentile calculation in my modeling. This method takes about 23hrs to complete.

However, if I repartition such that I only have the CLASS partition, the job takes about 10hrs. Does having too many sub-partitions slow down the spark executor jobs? I keep the partition hierarchy as CLASS -> DATE only because I need to append new data by DATE every day. If having only 1 partition is more efficient, then I would have to repartition to just the CLASS partition every day after loading new data. Could someone explain why having a single partition works faster? And if so, what would be the best method to partition the data on a daily basis by appending and without repartitioning the entire dataset?

Thank You

EDIT: I use the for loop on the file structure to loop by CLASS partition like so:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

The loaded df will have all the dates for CLASS=1. I then output the file as separate parquet files for each CLASS such that I have 7 parquet files:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

I then merge the 7 parquets into a single parquet is not a problem as the resulting parquet files are much smaller.

Upvotes: 4

Views: 14686

Answers (1)

Lamanus
Lamanus

Reputation: 13591

I have the partitioned data with three columns, year, month, and id. The folder path hierarchy is

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

and I can read the DataFrame by loading the root path.

val df = spark.read.parquet("s3://mybucket/")

Then, the partitioned column is automatically added to the DataFrame. Now, then you can filter your data for the partitioned column in a way that

val df_filtered = df.filter("year = '2020' and month = '09'")

and do something with df_filtered then the spark will use only the partitioned data!


For your repeated processing, you can use the fair scheduler of the spark. Add the fair.xml file into src/main/resources of your project with the below code,

<?xml version="1.0"?>

<allocations>
    <pool name="fair">
        <schedulingMode>FAIR</schedulingMode>
        <weight>10</weight>
        <minShare>0</minShare>
    </pool>
</allocations>

and set the spark configuration after creating the spark session.

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

Then you can do your job in parallel. You may want to parallelize the job depends on the CLASS, so

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

    val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
    
    // Do your job

}

the code will work at the same time with different CLASS values.

Upvotes: 4

Related Questions