Thagor
Thagor

Reputation: 900

Spark loading a large parquet file long Garbage Collection Times

I have large table saved as a parquet and when I try to load it I get an crazy amount of GC-Time like 80%. I use spark 2.4.3 The parquet is saved with the following schema:

/parentfolder/part_0001/parquet.file
/parentfolder/part_0002/parquet.file
/parentfolder/part_0003/parquet.file
                [...]
2432 in total

The table in total is 2.6 TiB and looks like this (both fields are 64bit int's):

+-----------+------------+
|        a  |         b  |
+-----------+------------+
|85899366440|515396105374|
|85899374731|463856482626|
|85899353599|661424977446|

          [...]

I have a total amount of 7.4 TiB cluster memory, with 480 cores, on 10 workers and I read the parquet like this:

df = spark.read.parquet('/main/parentfolder/*/').cache()

and as I said I get an crazy amount of garbage collection time right now it stands at: Task Time (GC Time) | 116.9 h (104.8 h) with only 110 GiB loaded after 22 min of wall time. I monitor one of the workers and memory usual hover around 546G/748G

what am I doing wrong here? Do I need a larger cluster? If my Dataset is 2.6 TiB why isn't 7.4 TiB of memory enough? But then again why isn't the memory full on my worker?

Upvotes: 2

Views: 1412

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

see Databrics article on this...

Tuning Java Garbage Collection for Apache Spark Applications

G1 GC Running Status (after Tuning)

-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms88g -Xmx88g -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20

need to garbge collector tuning in this case. try above example conf.

  • Also make sure that in your submit you are passing right parameters like executor memory driver memory

use

scala.collection.Map<String,scala.Tuple2<Object,Object>> getExecutorMemoryStatus() Return a map from the slave to the max memory available for caching and the remaining memory available for caching.

call and debug using getExecutorMemoryStatus API using pyspark's py4j bridge

sc._jsc.sc().getExecutorMemoryStatus()

Upvotes: 0

Steven
Steven

Reputation: 15318

just try to remove .cache(). There are only few cases where you need to cache your data, the most obvious one is one dataframe, several actions. But if your dataframe is that big, do not use cache. Use persist.

from pyspark import StorageLevel
df = spark.read.parquet('/main/parentfolder/*/').persist(StorageLevel.DISK_ONLY)

Upvotes: 1

Related Questions