Walrus the Cat
Walrus the Cat

Reputation: 2404

How to send transformed data from partitions to S3?

I have an RDD which is to big to collect. I have applied a chain of transformations to the RDD and want to send its transformed data directly from its partitions on my slaves to S3. I am currently operating as follows:

val rdd:RDD = initializeRDD
val rdd2 = rdd.transform
rdd2.first // in order to force calculation of RDD
rdd2.foreachPartition sendDataToS3

Unfortunately, the data that gets sent to S3 is untransformed. The RDD looks exactly like it did in stage initializeRDD.

Here is the body of sendDataToS3:

implicit class WriteableRDD[T](rdd:RDD[T]){

def transform:RDD[String] = rdd map {_.toString}

....
def sendPartitionsToS3(prefix:String) = {
  rdd.foreachPartition { p =>
    val filename = prefix+new scala.util.Random().nextInt(1000000)
    val pw = new PrintWriter(new File(filename))
    p foreach pw.println
    pw.close
    s3.putObject(S3_BUCKET, filename, new File(filename))
  }
  this
}

}

This is called with rdd.transform.sendPartitionsToS3(prefix).

How do I make sure the data that gets sent in sendDataToS3 is the transformed data?

Upvotes: 3

Views: 166

Answers (1)

Daniel Darabos
Daniel Darabos

Reputation: 27455

My guess is there is a bug in your code that is not included in the question.

I'm answering anyway just to make sure you are aware of RDD.saveAsTextFile. You can give it a path on S3 (s3n://bucket/directory) and it will write each partition into that path directly from the executors.

I can hardly imagine when you would need to implement your own sendPartitionsToS3 instead of using saveAsTextFile.

Upvotes: 3

Related Questions