Luke
Luke

Reputation: 7089

Memory error when reading nested textfiles into spark from S3

I'm trying to read around one million zipped text files into spark from S3. The zipped size of each file is between 50 MB and 80 MB. In all it's about 6.5 terabytes of data.

Unfortunately I'm running into an out of memory exception that I don't know how to resolve. Something as simple as:

raw_file_list = subprocess.Popen("aws s3 ls --recursive s3://my-bucket/export/", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n')
cleaned_names = ["s3://my-bucket/" + f.split()[3] for f in raw_file_list if not f.endswith('_SUCCESS')]
dat = sc.textFile(','.join(cleaned_names))
dat.count()

Yields:

 ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-22-8ce3c7d1073e> in <module>() ----> 1 dat.count()

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in count(self)
   1002         3
   1003         """
-> 1004         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1005 
   1006     def stats(self):

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in sum(self)
    993         6.0
    994         """
--> 995         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
    996 
    997     def count(self):

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in fold(self, zeroValue, op)
    867         # zeroValue provided to each partition is unique from the one provided
    868         # to the final reduce call
--> 869         vals = self.mapPartitions(func).collect()
    870         return reduce(op, vals, zeroValue)
    871 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.pyc in collect(self)
    769         """
    770         with SCCallSiteSync(self.context) as css:
--> 771             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    772         return list(_load_from_socket(port, self._jrdd_deserializer))
    773 

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/tmp/spark-tmp-lminer/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.OutOfMemoryError: GC overhead limit exceeded

Update:

Part of the issue seems to have been solved through this post. Seems that spark was having difficulty munging so many files from S3. Updated the error so that it only reflects the memory issues now.

Upvotes: 4

Views: 751

Answers (1)

Luke
Luke

Reputation: 7089

The problem was that there were too many files. The solution seems to be to decrease the number of partitions by reading in a subset of files and coalescing them to a smaller number. You can't make the partitions too big though: 500 - 1000 MB files cause their own problems.

Upvotes: 1

Related Questions