Reputation: 1095
I am currently exploring Spark. I am faced with the following task - get an RDD, partition it based on a certain criteria and then write multiple files in different folders in a S3 bucket.
Everything is fine until we come to the uploading to S3 part. I've read all the questions relsted to this problem on SO and found that I could either use AmazonS3Client
or the saveToTextFile
method for RDDs. There are two problems I face:
If I go with the AmazonS3Client
I get a java.io.NotSerializableException
since the code is sent from the Spark driver to the worker it needs to be serialized and apparently the the AmazonS3Client doesn't support that.
If I go with saveToTextFile
I face a similar problem. When I go in the foreachPartition
loop I need to I get Iterable[T]
(in this case p
) so if I want to use saveToTextFile
I need to create an RDD of the Iterable hence the parallelize
. The problem is that the SparkContext sc
also (rightfully so) doesn't serialize.
rdd.foreachPartition { p =>
sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://")
}
Any help will be greatly appreciated.
Upvotes: 3
Views: 1586
Reputation: 4927
There's no need to do that. You can just use saveAsTextFile
with rdd:
rdd.saveAsTextFile(s"s3n://dir/to/aux/file")
saveAsTextFile
will write to S3 in a folder with many parts of the file (as many parts as partitions). Then you can merge to a single file if you want:
def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = {
val hadoopConfig = sc.hadoopConfiguration
val fs = FileSystem.get(new URI(srcPath), hadoopConfig)
FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null)
}
mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)
Upvotes: 5