zaxme
zaxme

Reputation: 1095

Spark RDD foreachPartition to S3

I am currently exploring Spark. I am faced with the following task - get an RDD, partition it based on a certain criteria and then write multiple files in different folders in a S3 bucket.

Everything is fine until we come to the uploading to S3 part. I've read all the questions relsted to this problem on SO and found that I could either use AmazonS3Client or the saveToTextFile method for RDDs. There are two problems I face:

  1. If I go with the AmazonS3Client I get a java.io.NotSerializableException since the code is sent from the Spark driver to the worker it needs to be serialized and apparently the the AmazonS3Client doesn't support that.

  2. If I go with saveToTextFile I face a similar problem. When I go in the foreachPartition loop I need to I get Iterable[T] (in this case p) so if I want to use saveToTextFile I need to create an RDD of the Iterable hence the parallelize. The problem is that the SparkContext sc also (rightfully so) doesn't serialize.

rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }

Any help will be greatly appreciated.

Upvotes: 3

Views: 1586

Answers (1)

Marco
Marco

Reputation: 4927

There's no need to do that. You can just use saveAsTextFile with rdd:

rdd.saveAsTextFile(s"s3n://dir/to/aux/file")

saveAsTextFile will write to S3 in a folder with many parts of the file (as many parts as partitions). Then you can merge to a single file if you want:

  def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = {
    val hadoopConfig = sc.hadoopConfiguration
    val fs = FileSystem.get(new URI(srcPath), hadoopConfig)
    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null)
  }

  mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)

Upvotes: 5

Related Questions