revy
revy

Reputation: 4707

Spark: how to write dataframe to S3 efficiently

I am trying to figure out which is the best way to write data to S3 using (Py)Spark.

It seems I have no problem in reading from S3 bucket, but when I need to write it is really slow.

I've started the spark shell like so (including the hadoop-aws package):

AWS_ACCESS_KEY_ID=<key_id> AWS_SECRET_ACCESS_KEY=<secret_key> pyspark --packages org.apache.hadoop:hadoop-aws:3.2.0

This is the sample application

# Load several csv files from S3 to a Dataframe (no problems here)
df = spark.read.csv(path='s3a://mybucket/data/*.csv', sep=',')
df.show()

# Some processing
result_df = do_some_processing(df)
result_df.cache()
result_df.show()

# Write to S3
result_df.write.partitionBy('my_column').csv(path='s3a://mybucket/output', sep=',')  # This is really slow

When I try to write to S3, I get the following warning:

20/10/28 15:34:02 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.

Is there any setting I should change to have efficient write to S3? As now it is really slow, it took about 10 min to write 100 small files to S3.

Upvotes: 3

Views: 13486

Answers (1)

revy
revy

Reputation: 4707

It turns out you have to manually specify the committer (otherwise the default one will be used, which isn't optimized for S3):

result_df \
    .write \
    .partitionBy('my_column') \
    .option('fs.s3a.committer.name', 'partitioned') \
    .option('fs.s3a.committer.staging.conflict-mode', 'replace') \
    .option("fs.s3a.fast.upload.buffer", "bytebuffer") \ # Buffer in memory instead of disk, potentially faster but more memory intensive
    .mode('overwrite') \
    .csv(path='s3a://mybucket/output', sep=',')

Relevant documentation can be found here:

Upvotes: 5

Related Questions