Reputation: 1697
I have a PySpark application that (obviously) loads and transforms data.
I want to persist the result RDD to S3, but the Spark-supplied .saveAsTextFile()
function does not satisfy my requirements, because it writes multiple RDD entries into one file.
For example, let's assume that the RDD resultRDD
is:
[('cats', cats_data), ('dogs', dogs_data), ('flamingos', flamingos_data)]
When calling resultRDD.saveAsTextFile(s3n://path/to/somewhere/)
, it'll create multiple files, which may look like this:
1. 000_part00 [containing cats_data & dogs_data]
2. 000_part01 [containing only flamingos_data]
Note that the number of files created is not related to the number of elements in the RDD. Furthermore, I can't even know what's inside each file before opening it.
Instead, what I want to create is the following output:
1. cats [containing only cats_data]
2. dogs [containing only dogs_data]
2. flamingos [containing only flamingos_data]
I thought I might use boto
's S3Connection
and write to S3 manually, like this:
s3_connection = <connecting to S3 here>
bucket = s3_connection.get_bucket('animals_data')
def persist_to_s3(data_tuple):
s3_key = bucket.create_key(key=data_tuple[0], bucket=bucket)
s3_key.set_contents_from_string(data_tuple[1])
resultRDD.map(persist_to_s3)
Unfortunately, the connection and bucket objects are neither serializable nor thread safe (I presume), so I can't share the connection between nodes, like above.
I thought I might connect to S3 & get the bucket inside the persist_to_s3
function itself, but this operation will surely make AWS throttle my API usage, since I have a massive RDD.
Clarification: My actual dataset is huge, and the keys are all unique. So re-partitioning by key won't help in this case.
The other option that crossed my mind is to use repartition()
/ coalesce()
in order to reduce the number of partitions and then preform the above operation with mapPartitions()
, which will work but will be much slower.
repartition()
/ coalesce()
is the only proper way, which is more fit for this usage?Upvotes: 1
Views: 1921
Reputation: 3228
The other option that crossed my mind is to use repartition() / coalesce() in order to reduce the number of partitions and then preform the above operation with mapPartitions(), which will work but will be much slower.
repartition&mapPartitions is the relatively fast option, but you mentioned that it is slow. I think you can consider more traditional solution like multiple-thread reader/writer
1, write the result data using the format you described;
2, using multiple-thread reader/writer model to parallelly write data to S3 storage. The workflow like "parallel reader --read to--> concurrent blocking queue ----> parallel writer ---write to ----->S3".
Upvotes: 1