Reputation: 2404
I have an RDD
which is to big to collect
. I have applied a chain of transformations to the RDD
and want to send its transformed data directly from its partitions on my slaves to S3. I am currently operating as follows:
val rdd:RDD = initializeRDD
val rdd2 = rdd.transform
rdd2.first // in order to force calculation of RDD
rdd2.foreachPartition sendDataToS3
Unfortunately, the data that gets sent to S3 is untransformed. The RDD
looks exactly like it did in stage initializeRDD
.
Here is the body of sendDataToS3:
implicit class WriteableRDD[T](rdd:RDD[T]){
def transform:RDD[String] = rdd map {_.toString}
....
def sendPartitionsToS3(prefix:String) = {
rdd.foreachPartition { p =>
val filename = prefix+new scala.util.Random().nextInt(1000000)
val pw = new PrintWriter(new File(filename))
p foreach pw.println
pw.close
s3.putObject(S3_BUCKET, filename, new File(filename))
}
this
}
}
This is called with rdd.transform.sendPartitionsToS3(prefix)
.
How do I make sure the data that gets sent in sendDataToS3
is the transformed data?
Upvotes: 3
Views: 166
Reputation: 27455
My guess is there is a bug in your code that is not included in the question.
I'm answering anyway just to make sure you are aware of RDD.saveAsTextFile
. You can give it a path on S3 (s3n://bucket/directory
) and it will write each partition into that path directly from the executors.
I can hardly imagine when you would need to implement your own sendPartitionsToS3
instead of using saveAsTextFile
.
Upvotes: 3