dionj
dionj

Reputation: 11

Pyspark memory error when reading S3 json files - "java.lang.OutOfMemoryError: GC overhead limit exceeded"

Using Pyspark, I'm getting an error when attempting to a load a high number of json files from S3 into a dataframe. The error seems to dependent on the driver memory used by my spark session. The error message reads "java.lang.OutOfMemoryError: GC overhead limit exceeded". I spent a significant time doing online research but I haven't been able to find anything that points me to the exact cause of this error. Please find full error msg and code below.

I'd appreciate any help on this!


Driver environment
I'm using pyspark in a jupyter notebook running within jupyterlab, which itself is running on an EC2 instance with 30GB available ram.

Sparksession resources
spark.executor.memory: 3GB
spark.executor.cores: 2
spark.driver.memory: 5GB
spark.cores.max: 300

Data
I'm trying to read about 500k json files stored in S3, with a total data size of 100+GB. Each file is a single record. I read the files as json using spark.read.json(), without predefined schema. I realize that this way of storing + reading the data is far from ideal - parquet would be preferable - but it is the status quo at my company atm.

Error inspection

Code

import findspark
findspark.init()
import pyspark

spark = (
    pyspark.sql.SparkSession.builder
    .config('spark.executor.memory', '3g')
    .config('spark.executor.cores', '2')
    .config('spark.driver.memory','5g')
    .config('spark.cores.max', '300')
    .getOrCreate()
)

data = spark.read.json('s3a://some-bucket/some-prefix/year=2020/month=01/')
            .select('field1', 'field2', 'field3')

Full error

/opt/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding)
    272             path = [path]
    273         if type(path) == list:
--> 274             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    275         elif isinstance(path, RDD):
    276             def func(iterator):

/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o57.json.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.Arrays.copyOfRange(Arrays.java:3664)
    at java.lang.String.<init>(String.java:207)
    at java.lang.String.substring(String.java:1969)
    at org.apache.hadoop.fs.Path.<init>(Path.java:219)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3.apply(InMemoryFileIndex.scala:254)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3.apply(InMemoryFileIndex.scala:243)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:243)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$createInMemoryFileIndex(DataSource.scala:533)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:371)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.createBaseDataset(JsonDataSource.scala:123)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:96)
    at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
    at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
    at scala.Option.orElse(Option.scala:289)
    at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Upvotes: 1

Views: 3103

Answers (2)

Shubham Jain
Shubham Jain

Reputation: 5536

When we read json files the read operation is performed two times i.e. first loading and constructing schema second loading into executors.

Now to avoid this, try getting the schema from single file or create a sample file with all the columns that your json consists

df = spark.read.json('/path/to/single.json')
schema = df.schema
df2 = spark.read.json('path/to/all/files/', schema=schema)

Upvotes: 4

Logan
Logan

Reputation: 126

I don't have an answer for you but I was running into same error in PySpark while reading 20k-40k JSON files from HDFS. These files had 100s-1000s of rows each in them but overall size of all data in a folder was still < 10GB. I think your post lead me to try to increase my driver memory since increasing my executor memory did not help. Clearly the driver is trying to keep track of which executors are doing which tasks and I guess for an extreme number of files to be read in then the driver needs more memory. I'm doing any collect() on my dataframes, just read and write. Basically transferring from .json to .orc file types. Changing the driver memory from 1G to 4G seems to have solved my problem. I don't know if there is another answer for you other than you will be limited by driver memory for extreme number of input files.

Upvotes: 1

Related Questions