Reputation: 31
After writing a spark dataframe to a file, I am attempting to rename the file using code like below:
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file = fs.globStatus(new Path(path + "/part*"))(0).getPath().getName()
fs.rename(new Path(path + "/" + file), new Path(path + "/" + fileName))
This works great running Spark locally... However when I run my jar on Dataproc, I get an error like below:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-***, in path: gs://prj-*****/part*, expected bucket: dataproc-temp-***
It seems files may not be saved to target buckets until the end of the job, and therefore struggling to rename them. I have tried using the .option("mapreduce.fileoutputcommitter.algorithm.version", "2")
as I read something about this that looked promising.
Update:
Still no luck. It seems that spark.sparkContext.hadoopConfiguration
expects the base bucket to be a dataproc-temp-*
bucket. Full stack trace below:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong bucket: prj-**, in path: gs://p**, expected bucket: dataproc-temp-u***
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:95)
at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:667)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:394)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:149)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1085)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1059)
Upvotes: 2
Views: 472
Reputation: 4457
HCFS instance returned by FileSystem.get(...)
call is tied to the specific FS (in this case GCS bucket). By default Dataproc Serverless Spark configured to use gs://daptaproc-temp-*/
bucket as a default HCFS via spark.hadoop.fs.defaultFS
Spark property.
To solve this issue you need to create HCFS instance using FileSystem#get(URI uri, Configuration conf)
call:
val fs = FileSystem.get(path.toUri, spark.sparkContext.hadoopConfiguration)
Upvotes: 2