Marigold
Marigold

Reputation: 1659

Gzip files with Spark

I have a Spark job which takes several thousand files as input and downloads them from Amazon S3 and process them in the map phase, where each map step returns a string. I'd like to compress outputs to .tar.gz file and upload it to S3 afterwards. One way to do it is

outputs = sc.map(filenames).collect()
for output in outputs:
    with tempfile.NamedTemporaryFile() as tar_temp:
        tar = tarfile.open(tar_temp.name, "w:gz")
        for output in outputs:
            with tempfile.NamedTemporaryFile() as output_temp:
                output_temp.write(output)
                tar.add(output_temp.name)
        tar.close()

the problem is that outputs don't fit into memory (but they fit on disk). Is there a way to save the outputs to master filesystem in map phase? Or perhaps use loop for output in outputs as a generator so that I don't have to load everything into memory?

Upvotes: 2

Views: 1787

Answers (1)

Olivier Girardot
Olivier Girardot

Reputation: 4648

In Spark 1.3.0 you will be able to use the same Java/Scala method toLocalIterator in Python.

The pull request has been merged : https://github.com/apache/spark/pull/4237

Here's the designated documentation :

    """
    Return an iterator that contains all of the elements in this RDD.
    The iterator will consume as much memory as the largest partition in this RDD.
    >>> rdd = sc.parallelize(range(10))
    >>> [x for x in rdd.toLocalIterator()]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    """

All in all, it will allow you to iterate on your outputs, without collecting everything to the driver.

Regards,

Upvotes: 1

Related Questions