Narmadha
Narmadha

Reputation: 65

How to reduce the time taken to write parquet files to s3 using AWS Glue

I'm creating a glue job that needs to process a daily data volume of 4TB from s3 path - s3://<path>/<year>/<month>/<day>/<hour>/. Hence I created a loop that reads data into a spark df by hourly folders(155Gb each), filters for certain categories and writes back to s3 as parquet files partitioned by the categories filtered (s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/). I'm using 60 G2.X worker nodes each with (8 vCPU, 32 GB of memory, 128 GB disk). S3 writes are extremely slow that it takes over 10 hours to finish running. Is there a way to expedite/optimize the s3 writes apart from increasing the number of nodes?


def s3_load_job(input_list):

    hour, year, month, day = input_list
    logger.info(f"hour in s3 func {hour}")
    
    # get data from s3
    s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
    logger.info(f"print s3 path {s3_path}")

    #user defined library function that return spark df
    df = get_df_from_s3(glueContext, s3_path)

    df = df.withColumn('category', F.lower(F.col('category')))

    df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))

    #write to s3
    datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
    
    glueContext.write_dynamic_frame.from_options(frame = datasink4,
                                                             connection_type = "s3",
                                                             connection_options = 
                                                             {"path":"s3://<path>/"
                                           ,"partitionKeys"["category","year","month","day","hour"]}
                                                             ,format = "glueparquet" )



def main():
    
    year = '2020'
    month = '08'
    day = '01'
    hours = ["%.2d" % i for i in range(24)]

    input_list = [[hour, year, month, day] for hour in hours]
    logger.info(f"input_list {input_list}")

    for i in input_list:
        s3_load_job(i)
    
    job.commit()



if __name__ == "__main__":
    main()            
       

Upvotes: 4

Views: 9020

Answers (4)

Romibuzi
Romibuzi

Reputation: 194

Try to make use of Hadoop s3a committers: https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/committer_architecture.html

Configuration is not straightforward in Glue, I have written the procedure here if you want: https://www.romainardiet.com/2022-06-21/aws-glue-and-s3a-committers/

This will make a big difference as writing data doesn't rely anymore on a temporary s3 directory + renames which are slow on s3 especially if you write big files.

Upvotes: 0

Fraser Sequeira
Fraser Sequeira

Reputation: 26

Seems like you must have figured out a way to handle this. Would like to share what worked for me. I ran the glue job every hour, enabled job bookmarking to to not reprocess older files. Ensure that you are not creating too many partitions that not only causes a longer execution time but incase you want to query via Athena your queries could timeout in the long run. Keep partitions to a minimum. With repartitioning your job could spend too much time shuffling data that could increase the job runtime. However frequent hourly runs could help. Do share what worked for you.

Upvotes: 1

Shubham Jain
Shubham Jain

Reputation: 5526

You can try the following

  1. Don't convert the pyspark df to dynamicFrame as you can directly save the pyspark dataframe to the s3.
  2. Since you are getting file of size 1MB to 15MB EACH, you need to do the optimization. So try repartitioning the dataframe before writing it to the s3.

IF your partition size is 250 GB, then you should create the output file of size 256 MB atleast or in case of G2.x you can also create the file of size 512 MB each.

To achieve this you can do

You can generate 500 files in each partition as 500*512 = 250 GB

df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)

Upvotes: 0

Nir Hedvat
Nir Hedvat

Reputation: 870

If you're using S3 (Object Storage), try setting the following configurations:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2

Upvotes: 0

Related Questions