JamesLi
JamesLi

Reputation: 61

What is the best practice to collect a large data set from spark rdd?

I am using pyspark to process my data and at the very end i need collect data from rdd using rdd.collect(). However, my spark crashes due to the memory problem. I tried a number of ways, but no luck. I am now running with the following code, process a small chunk of data for each partition:

def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter


for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    myCollection = part_rdd.collect()
    for row in myCollection:
          #Do something with each row

The new code I am currently using does not crash, but seems running forever.

Is there a better way to collect data from a large rdd?

Upvotes: 4

Views: 7507

Answers (2)

sudo
sudo

Reputation: 5784

I don't know if this is the best way, but it's the best way I've tried. Not sure if it's better or worse than yours. Same idea, splitting it into chunks, but you can be more flexible with the chunk size.

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size

Example usage where I want to append a huge RDD to a CSV file on disk without ever populating a Python list with the entire RDD:

def rdd_to_csv(fname, rdd):
    import csv
    f = open(fname, "a")
    c = csv.writer(f)
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
        c.writerows([row])
    f.close()

rdd_to_csv("~/test.csv", my_really_big_rdd)

Upvotes: 4

E.F.Walker
E.F.Walker

Reputation: 234

Trying to "collect" a huge RDD is problematic. "Collect" returns a list, which implies the entire RDD content has to be stored in the driver's memory. This is a "showstopper" problem. Typically one wants a Spark application to be able to process data sets whose size is well beyond what would fit in a single node's memory.

Let's suppose the RDD barely fits into memory, and "collect" works. Then we have another "showstopper" --- low performance. In your code, the collected RDD is processed in a loop: "for row in myCollection". This loop is executed by exactly one core. So instead of processing the data via an RDD, whose computations get distributed amongst all the cores of the cluster, of which there are probably 100's if not 1000's --- instead all the work on the entire dataset is placed on the back of a single core.

Upvotes: 2

Related Questions