Reputation: 65
I'm creating a glue job that needs to process a daily data volume of 4TB from s3 path - s3://<path>/<year>/<month>/<day>/<hour>/
. Hence I created a loop that reads data into a spark df by hourly folders(155Gb each), filters for certain categories and writes back to s3 as parquet files partitioned by the categories filtered (s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/
). I'm using 60 G2.X worker nodes each with (8 vCPU, 32 GB of memory, 128 GB disk). S3 writes are extremely slow that it takes over 10 hours to finish running. Is there a way to expedite/optimize the s3 writes apart from increasing the number of nodes?
def s3_load_job(input_list):
hour, year, month, day = input_list
logger.info(f"hour in s3 func {hour}")
# get data from s3
s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
logger.info(f"print s3 path {s3_path}")
#user defined library function that return spark df
df = get_df_from_s3(glueContext, s3_path)
df = df.withColumn('category', F.lower(F.col('category')))
df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))
#write to s3
datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
glueContext.write_dynamic_frame.from_options(frame = datasink4,
connection_type = "s3",
connection_options =
{"path":"s3://<path>/"
,"partitionKeys"["category","year","month","day","hour"]}
,format = "glueparquet" )
def main():
year = '2020'
month = '08'
day = '01'
hours = ["%.2d" % i for i in range(24)]
input_list = [[hour, year, month, day] for hour in hours]
logger.info(f"input_list {input_list}")
for i in input_list:
s3_load_job(i)
job.commit()
if __name__ == "__main__":
main()
Upvotes: 4
Views: 9020
Reputation: 194
Try to make use of Hadoop s3a committers: https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/committer_architecture.html
Configuration is not straightforward in Glue, I have written the procedure here if you want: https://www.romainardiet.com/2022-06-21/aws-glue-and-s3a-committers/
This will make a big difference as writing data doesn't rely anymore on a temporary s3 directory + renames which are slow on s3 especially if you write big files.
Upvotes: 0
Reputation: 26
Seems like you must have figured out a way to handle this. Would like to share what worked for me. I ran the glue job every hour, enabled job bookmarking to to not reprocess older files. Ensure that you are not creating too many partitions that not only causes a longer execution time but incase you want to query via Athena your queries could timeout in the long run. Keep partitions to a minimum. With repartitioning your job could spend too much time shuffling data that could increase the job runtime. However frequent hourly runs could help. Do share what worked for you.
Upvotes: 1
Reputation: 5526
You can try the following
IF your partition size is 250 GB, then you should create the output file of size 256 MB atleast or in case of G2.x you can also create the file of size 512 MB each.
To achieve this you can do
You can generate 500 files in each partition as 500*512 = 250 GB
df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)
Upvotes: 0
Reputation: 870
If you're using S3 (Object Storage), try setting the following configurations:
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2
Upvotes: 0