Reputation: 1835
I'm running a notebook on Databricks which creates partitioned PySpark data frames and uploads them to s3. The table in question has ~5,000 files and is ~5 GB in total size (it needs to be partitioned in this way to be effectively queried by Athena). My issue is that the writing of files to s3 seems to be sequential rather than parallel and can take up to one hour. For example:
df.repartition("customer_id")
.write.partitionBy("customer_id")
.mode("overwrite")
.format("parquet")
.save("s3a://mybucket/path-to-table/")
I have launched my cluster (i3.xlarge) on AWS with the following config:
spark.hadoop.orc.overwrite.output.file true
spark.databricks.io.directoryCommit.enableLogicalDelete true
spark.sql.sources.commitProtocolClass org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
parquet.enable.summary-metadata false
spark.hadoop.fs.s3.maxRetries 20
spark.databricks.hive.metastore.glueCatalog.enabled true
spark.hadoop.validateOutputSpecs false
mapreduce.fileoutputcommitter.marksuccessfuljobs false
spark.sql.legacy.parquet.datetimeRebaseModeInRead CORRECTED
spark.hadoop.fs.s3.consistent.retryPeriodSeconds 10
spark.speculation true
spark.hadoop.fs.s3.consistent true
spark.hadoop.fs.s3.consistent.retryCount 5
What's the recommended approach in this case where I have many small files that I need to be written to s3 quickly?
Upvotes: 2
Views: 2244
Reputation: 1126
I see several reasons your write is slow and can be speed up:
Recommend:
CREATE TABLE ... USING DELTA LOCATION ...
. This will speed up your customer selective queries, joins on customer_id will be sped up if you also OPTIMIZE ... ZORDER BY customer_id
and can auto optimize the size of your files.Final result looks much cleaner:
df.coalesce(50)
.write
.mode("overwrite")
.format("delta")
.save("s3a://mybucket/path-to-table/")
See about auto optimize options to automate the file sizing: https://docs.databricks.com/delta/optimizations/auto-optimize.html#usage
Delta Lake tables can be used with Athena https://docs.databricks.com/delta/presto-integration.html#presto-and-athena-to-delta-lake-integration
Upvotes: 2
Reputation: 31
On the s3 bucket, have you set fs.s3a.fast.upload = true
? I am looking at a similar ticket at this link
Upvotes: 1