Reputation: 346
I have spark 2.4.0 and Hadoop 3.1.1. According to Hadoop Documentation, to use the new Magic committer that allow write of parquet files to S3 consistently, I've setup those values in conf/spark-default.conf
:
spark.sql.sources.commitProtocolClass com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.hadoop.mapreduce.lib.output.BindingPathOutputCommitter
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.hadoop.fs.s3a.committer.name magic
spark.hadoop.fs.s3a.committer.magic.enabled true
When using this configuration I end up with the exception:
java.lang.ClassNotFoundException: com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol
My question is double, first do I properly understand that Hadoop 3.1.1 allow write of parquet file to S3 consistently ?
Second, if I did understand well, how to use the new committer properly from Spark ?
Upvotes: 6
Views: 5076
Reputation: 1061
All the new committers config documentation I've read up to date, is missing one fundamental fact:
They promise those cloud integration libs will be bundled with spark 3.0.0, but for now you have to add libraries yourself.
Under the cloud integration maven repos there are multiple distributions supporting the committers, I found one working with directory committer but not the magic.
In general the directory committer is the recommended over magic as it has been well tested and tried. It requires shared filesystem (magic committer does not require one, but needs s3guard) such as HDFS or NFS (we use AWS EFS) to coordinate spark worker writes to S3.
Upvotes: 0
Reputation: 346
Edit:
OK, I've two intances of server one being a bit old now, I've attempted to use last version of minio with those parameters:
sc.hadoopConfiguration.set("hadoop.fs.s3a.path.style.access","true")
sc.hadoopConfiguration.set("hadoop.fs.s3a.fast.upload","true")
sc.hadoopConfiguration.set("hadoop.fs.s3a.fast.upload.buffer","bytebuffer")
sc.hadoopConfiguration.set("fs.s3a.path.style.access","true")
sc.hadoopConfiguration.set("fs.s3a.multipart.size","128M")
sc.hadoopConfiguration.set("fs.s3a.fast.upload.active.blocks","4")
sc.hadoopConfiguration.set("fs.s3a.committer.name","partitioned")
I'm able to write so far without trouble.
However my swift server which is a bit older with this config:
sc.hadoopConfiguration.set("fs.s3a.signing-algorithm","S3SignerType")
seems to not support properly the partionner.
Regarding "Hadoop S3guard":
It is not possible currently, Hadoop S3guard that keep metadata of the S3 files must be enable in Hadoop. The S3guard though rely on DynamoDB a proprietary Amazon service.
There's no alternative now like a sqlite file or other DB system to store the metadata.
So if you're using S3 with minio
or any other S3 implementation, you're missing DynamoDB.
This article explains nicely how works S3guard
Upvotes: 3
Reputation: 13430
Kiwy: that's my code: I can help you with this. Some of the classes haven't got into the ASF spark releases, but you'll find the in the Hadoop JARs, and I could have a go at building the ASF release with the relevant dependencies in (I could put them in downstream; they used to be there)
You do not need S3Guard turned on to use the "staging committer"; it's only the "magic" variant which needs consistent object store listings during the commit phase.
Upvotes: 1