Gautham Acharya
Gautham Acharya

Reputation: 153

Apache Spark - Write Parquet Files to S3 with both Dynamic Partition Overwrite and S3 Committer

I'm currently building an application with Apache Spark (pyspark), and I have the following use case:

For various internal reasons, all four of the above bullet points are non-negotiable.

I have everything but the last bullet point working. I'm running a pyspark application, and writing to S3 (actually an on-prem Ceph instance), ensuring that spark.sql.sources.partitionOverwriteMode is set to dynamic.

However, this means that my spark-staging files are being staged in S3, and then committed by using a delete-and-rename operation, which is very expensive.

I've tried using the Spark Directory Committer in order to stage files on my local disk. This works great unless spark.sql.sources.partitionOverwriteMode.

After digging through the source code, it looks like the PathOutputCommitter does not support Dynamic Partition Overwriting.

At this point, I'm stuck. I want to be able to write my staging files to local disk, and then commit the results to S3. However, I also need to be able to dynamically overwrite a single partition without overwriting the entire Parquet table.

For reference, I'm running pyspark=3.1.2, and using the following spark-submit command:

spark-submit --repositories https://repository.cloudera.com/artifactory/cloudera-repos/ --packages com.amazonaws:aws-java-sdk:1.11.375,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.spark:spark-hadoop-cloud_2.12:3.1.1.3.1.7270.0-253

I get the following error when spark.sql.sources.partitionOverwriteMode is set to dynamic:

java.io.IOException: PathOutputCommitProtocol does not support dynamicPartitionOverwrite

My spark config is as follows:


        self.spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
        self.spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

        self.spark.conf.set("spark.hadoop.fs.s3a.committer.name", "magic")

        self.spark.conf.set("spark.sql.sources.commitProtocolClass",
                            "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")

        self.spark.conf.set("spark.sql.parquet.output.committer.class",
                            "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")

        self.spark.conf.set(
            "spark.sql.sources.partitionOverwriteMode", "dynamic"
        )

Upvotes: 5

Views: 4165

Answers (1)

stevel
stevel

Reputation: 13470

afraid the s3a committers don't support the dynamic partition overwrite feature. That actually works by doing lots of renaming, so misses the entire point of zero rename committers.

the "partioned" committer was written by netflix for their use case of updating/overwriting single partitions in an active table. it should work for you as it is the same use case.

consult the documentation

Upvotes: 3

Related Questions