Reputation: 11
Using Pyspark, I'm getting an error when attempting to a load a high number of json files from S3 into a dataframe. The error seems to dependent on the driver memory used by my spark session. The error message reads "java.lang.OutOfMemoryError: GC overhead limit exceeded". I spent a significant time doing online research but I haven't been able to find anything that points me to the exact cause of this error. Please find full error msg and code below.
I'd appreciate any help on this!
Driver environment
I'm using pyspark in a jupyter notebook running within jupyterlab, which itself is running on an EC2 instance with 30GB available ram.
Sparksession resources
spark.executor.memory: 3GB
spark.executor.cores: 2
spark.driver.memory: 5GB
spark.cores.max: 300
Data
I'm trying to read about 500k json files stored in S3, with a total data size of 100+GB. Each file is a single record. I read the files as json using spark.read.json()
, without predefined schema. I realize that this way of storing + reading the data is far from ideal - parquet would be preferable - but it is the status quo at my company atm.
Error inspection
spark.read.json
return to the driver that seems to eat up ram?Code
import findspark
findspark.init()
import pyspark
spark = (
pyspark.sql.SparkSession.builder
.config('spark.executor.memory', '3g')
.config('spark.executor.cores', '2')
.config('spark.driver.memory','5g')
.config('spark.cores.max', '300')
.getOrCreate()
)
data = spark.read.json('s3a://some-bucket/some-prefix/year=2020/month=01/')
.select('field1', 'field2', 'field3')
Full error
/opt/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding)
272 path = [path]
273 if type(path) == list:
--> 274 return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
275 elif isinstance(path, RDD):
276 def func(iterator):
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o57.json.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.<init>(String.java:207)
at java.lang.String.substring(String.java:1969)
at org.apache.hadoop.fs.Path.<init>(Path.java:219)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3.apply(InMemoryFileIndex.scala:254)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$bulkListLeafFiles$3.apply(InMemoryFileIndex.scala:243)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:243)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$createInMemoryFileIndex(DataSource.scala:533)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:371)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.createBaseDataset(JsonDataSource.scala:123)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:96)
at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:64)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:391)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Upvotes: 1
Views: 3103
Reputation: 5536
When we read json files the read operation is performed two times i.e. first loading and constructing schema second loading into executors.
Now to avoid this, try getting the schema from single file or create a sample file with all the columns that your json consists
df = spark.read.json('/path/to/single.json')
schema = df.schema
df2 = spark.read.json('path/to/all/files/', schema=schema)
Upvotes: 4
Reputation: 126
I don't have an answer for you but I was running into same error in PySpark while reading 20k-40k JSON files from HDFS. These files had 100s-1000s of rows each in them but overall size of all data in a folder was still < 10GB. I think your post lead me to try to increase my driver memory since increasing my executor memory did not help. Clearly the driver is trying to keep track of which executors are doing which tasks and I guess for an extreme number of files to be read in then the driver needs more memory. I'm doing any collect() on my dataframes, just read and write. Basically transferring from .json to .orc file types. Changing the driver memory from 1G to 4G seems to have solved my problem. I don't know if there is another answer for you other than you will be limited by driver memory for extreme number of input files.
Upvotes: 1