fez
fez

Reputation: 1835

speeding up heavily partitioned dataframe to s3 on databricks

I'm running a notebook on Databricks which creates partitioned PySpark data frames and uploads them to s3. The table in question has ~5,000 files and is ~5 GB in total size (it needs to be partitioned in this way to be effectively queried by Athena). My issue is that the writing of files to s3 seems to be sequential rather than parallel and can take up to one hour. For example:

df.repartition("customer_id")
  .write.partitionBy("customer_id")
  .mode("overwrite")
  .format("parquet")
  .save("s3a://mybucket/path-to-table/")

I have launched my cluster (i3.xlarge) on AWS with the following config:

spark.hadoop.orc.overwrite.output.file true
spark.databricks.io.directoryCommit.enableLogicalDelete true
spark.sql.sources.commitProtocolClass org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
parquet.enable.summary-metadata false
spark.hadoop.fs.s3.maxRetries 20
spark.databricks.hive.metastore.glueCatalog.enabled true
spark.hadoop.validateOutputSpecs false
mapreduce.fileoutputcommitter.marksuccessfuljobs false
spark.sql.legacy.parquet.datetimeRebaseModeInRead CORRECTED
spark.hadoop.fs.s3.consistent.retryPeriodSeconds 10
spark.speculation true
spark.hadoop.fs.s3.consistent true
spark.hadoop.fs.s3.consistent.retryCount 5

What's the recommended approach in this case where I have many small files that I need to be written to s3 quickly?

Upvotes: 2

Views: 2244

Answers (2)

Douglas M
Douglas M

Reputation: 1126

I see several reasons your write is slow and can be speed up:

  1. You may have over 5,000 customers? Thus with the partition by, you probably have more than 5,000 partitions. This can be very slow with Parquet (non Delta Lake tables) due to overhead in the meta store. I don't think you want so many partitions.
  2. With 5,000 files for 5GB, each file is about 1MB in size. This is very small. The files you write out should be closer to 100MB in size for this problem.
  3. The default cluster options are very well engineered, I very infrequently need to change them, when I do, I'm enabling new features. You should try addressing the above items and also removing all of these overrides on the settings.
  4. Repartition("customer_id") and partitionBy("customer_id") are redundant.

Recommend:

  1. Getting file size up to ~ 100MB, you can do this with coalesce() if your previous stage created > 50 partitions.
  2. Get rid of the partition by customer_id, perhaps you might be thinking there are good reasons for it, but the small files and large number of partitions are killing your performance.
  3. Try the open Delta Lake format (e.g. CREATE TABLE ... USING DELTA LOCATION .... This will speed up your customer selective queries, joins on customer_id will be sped up if you also OPTIMIZE ... ZORDER BY customer_id and can auto optimize the size of your files.

Final result looks much cleaner:

df.coalesce(50)
  .write
  .mode("overwrite")
  .format("delta")
  .save("s3a://mybucket/path-to-table/")

See about auto optimize options to automate the file sizing: https://docs.databricks.com/delta/optimizations/auto-optimize.html#usage

Delta Lake tables can be used with Athena https://docs.databricks.com/delta/presto-integration.html#presto-and-athena-to-delta-lake-integration

Upvotes: 2

David Goodfellow
David Goodfellow

Reputation: 31

On the s3 bucket, have you set fs.s3a.fast.upload = true? I am looking at a similar ticket at this link

Upvotes: 1

Related Questions