Reputation: 61
I am using pyspark to process my data and at the very end i need collect data from rdd using rdd.collect(). However, my spark crashes due to the memory problem. I tried a number of ways, but no luck. I am now running with the following code, process a small chunk of data for each partition:
def make_part_filter(index):
def part_filter(split_index, iterator):
if split_index == index:
for el in iterator:
yield el
return part_filter
for part_id in range(rdd.getNumPartitions()):
part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
myCollection = part_rdd.collect()
for row in myCollection:
#Do something with each row
The new code I am currently using does not crash, but seems running forever.
Is there a better way to collect data from a large rdd?
Upvotes: 4
Views: 7507
Reputation: 5784
I don't know if this is the best way, but it's the best way I've tried. Not sure if it's better or worse than yours. Same idea, splitting it into chunks, but you can be more flexible with the chunk size.
def rdd_iterate(rdd, chunk_size=1000000):
indexed_rows = rdd.zipWithIndex().cache()
count = indexed_rows.count()
print("Will iterate through RDD of count {}".format(count))
start = 0
end = start + chunk_size
while start < count:
print("Grabbing new chunk: start = {}, end = {}".format(start, end))
chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
for row in chunk:
yield row[0]
start = end
end = start + chunk_size
Example usage where I want to append a huge RDD to a CSV file on disk without ever populating a Python list with the entire RDD:
def rdd_to_csv(fname, rdd):
import csv
f = open(fname, "a")
c = csv.writer(f)
for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
c.writerows([row])
f.close()
rdd_to_csv("~/test.csv", my_really_big_rdd)
Upvotes: 4
Reputation: 234
Trying to "collect" a huge RDD is problematic. "Collect" returns a list, which implies the entire RDD content has to be stored in the driver's memory. This is a "showstopper" problem. Typically one wants a Spark application to be able to process data sets whose size is well beyond what would fit in a single node's memory.
Let's suppose the RDD barely fits into memory, and "collect" works. Then we have another "showstopper" --- low performance. In your code, the collected RDD is processed in a loop: "for row in myCollection". This loop is executed by exactly one core. So instead of processing the data via an RDD, whose computations get distributed amongst all the cores of the cluster, of which there are probably 100's if not 1000's --- instead all the work on the entire dataset is placed on the back of a single core.
Upvotes: 2