matan129
matan129

Reputation: 1697

Saving each Spark RDD entry separately to S3

I have a PySpark application that (obviously) loads and transforms data.

I want to persist the result RDD to S3, but the Spark-supplied .saveAsTextFile() function does not satisfy my requirements, because it writes multiple RDD entries into one file.

For example, let's assume that the RDD resultRDD is:

[('cats', cats_data), ('dogs', dogs_data), ('flamingos', flamingos_data)]

When calling resultRDD.saveAsTextFile(s3n://path/to/somewhere/), it'll create multiple files, which may look like this:

1. 000_part00 [containing cats_data & dogs_data]
2. 000_part01 [containing only flamingos_data]

Note that the number of files created is not related to the number of elements in the RDD. Furthermore, I can't even know what's inside each file before opening it.

Instead, what I want to create is the following output:

1. cats [containing only cats_data]
2. dogs [containing only dogs_data]
2. flamingos [containing only flamingos_data]

I thought I might use boto's S3Connection and write to S3 manually, like this:

s3_connection = <connecting to S3 here>
bucket = s3_connection.get_bucket('animals_data')

def persist_to_s3(data_tuple):
   s3_key = bucket.create_key(key=data_tuple[0], bucket=bucket)
   s3_key.set_contents_from_string(data_tuple[1])

resultRDD.map(persist_to_s3)

Unfortunately, the connection and bucket objects are neither serializable nor thread safe (I presume), so I can't share the connection between nodes, like above.

I thought I might connect to S3 & get the bucket inside the persist_to_s3 function itself, but this operation will surely make AWS throttle my API usage, since I have a massive RDD.

Clarification: My actual dataset is huge, and the keys are all unique. So re-partitioning by key won't help in this case.

The other option that crossed my mind is to use repartition() / coalesce() in order to reduce the number of partitions and then preform the above operation with mapPartitions(), which will work but will be much slower.

Upvotes: 1

Views: 1921

Answers (1)

Shawn Guo
Shawn Guo

Reputation: 3228

The other option that crossed my mind is to use repartition() / coalesce() in order to reduce the number of partitions and then preform the above operation with mapPartitions(), which will work but will be much slower.

repartition&mapPartitions is the relatively fast option, but you mentioned that it is slow. I think you can consider more traditional solution like multiple-thread reader/writer

1, write the result data using the format you described;
2, using multiple-thread reader/writer model to parallelly write data to S3 storage. The workflow like "parallel reader --read to--> concurrent blocking queue ----> parallel writer ---write to ----->S3".

Upvotes: 1

Related Questions