Rubber Duck
Rubber Duck

Reputation: 3723

Using Spark on Dataproc, how to write to GCS separately from each partition?

Using Spark on GCP Dataproc, I successfuly write an entire RDD to GCS like so:

rdd.saveAsTextFile(s"gs://$path")

The products are files for each partition in the same path.

How do I write files for each partition (with a unique path based on information from the partition)

Below is an invented non working wishful code example

    rdd.mapPartitionsWithIndex(
      (i, partition) =>{

        partition.write(path = s"gs://partition_$i", data = partition_specific_data)
      }
    )

when I call the function below from within the partition on my mac it writes to local disk, on Dataproc I get an error not recognizing the gs as a valid path.

def writeLocally(filePath: String, data: Array[Byte], errorMessage: String): Unit = {

println("Juicy Platform")

val path = new Path(filePath)

var ofos: Option[FSDataOutputStream] = null

try {

  println(s"\nTrying to write to $filePath\n")

  val conf = new Configuration()

  conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
  conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

  //      conf.addResource(new Path("/home/hadoop/conf/core-site.xml"))


  println(conf.toString)

  val fs = FileSystem.get(conf)

  val fos = fs.create(path)
  ofos = Option(fos)

  fos.write(data)

  println(s"\nWrote to $filePath\n")
}
catch {
  case e: Exception =>

    logError(errorMessage, s"Exception occurred writing to GCS:\n${ExceptionUtils.getStackTrace(e)}")
}
finally {
  ofos match {
    case Some(i) => i.close()
    case _ =>
  }
}
  }

This is the error:

java.lang.IllegalArgumentException: Wrong FS: gs://path/myFile.json, expected: hdfs://cluster-95cf-m

Upvotes: 2

Views: 3018

Answers (1)

Dennis Huo
Dennis Huo

Reputation: 10677

If running on a Dataproc cluster, you shouldn't need to explicitly populate "fs.gs.impl" in the Configuration; a new Configuration() should already contain the necessary mappings.

The main problem here is that val fs = FileSystem.get(conf) is using the fs.defaultFS property of the conf; it has no way of knowing whether you wanted to get a FileSystem instance specific to HDFS or to GCS. In general, In Hadoop and Spark, a FileSystem instance is fundamentally tied to a single URL scheme; you need to fetch a scheme-specific instance for each different scheme, such as hdfs:// or gs:// or s3://.

The simplest fix to your problem is to always use Path.getFileSystem(Configuration) as opposed to FileSystem.get(Configuration). And make sure your path is fully-qualified with the scheme:

...
val path = "gs://bucket/foo/data"
val fs = path.getFileSystem(conf)

val fos = fs.create(path)
ofos = Option(fos)

fos.write(data)
...

Upvotes: 3

Related Questions