Gaurav Madan
Gaurav Madan

Reputation: 1

Spark Driver going OOM

For certain DataFrames, applying the withColumn method in Spark 3.4.1 causes the driver to choke and run out of memory. However, the same DataFrames are processed successfully in Spark 2.4.0.

Heap Dump Analysis: After enabling heap dump on out-of-memory errors using the following JVM options: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark-job-fail.hprof

I analyzed the stack trace and found the following significant frames and local variables:

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:4273)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:1622)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:2820)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset; (Dataset.scala:2759)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset; (DataPersistenceUtil.scala:88)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V (DataPersistenceUtil.scala:19)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V (BronzeStep.scala:23)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V (MainJob.scala:78)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes
com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V (MainJob.scala:66)
org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

Driver Configuration: Driver instance: c6g.xlarge with 4 vCPUs and 8 GB RAM. spark.driver.memory and spark.driver.memoryOverhead are set to default values.

Observations: The DataFrame schema is very nested and large, which might be contributing to the memory issue. Despite similar configurations, Spark 2.4.0 processes the DataFrame without issues, while Spark 3.4.1 does not.

Schema of dataframe: https://drive.google.com/file/d/1wgFB0_WvdQdGoEMGFePhZwLR7aQZ5fPn/view?usp=sharing

Kindly find the configs for both setup below:

All spark configurations in spark 2.4.0 setup

Cluster setup configuration: {
 "BootstrapActions": [
   {
     "Name": "Install EP dependencies",
     "ScriptBootstrapAction": {
       "Path": "s3://uc-emr-data/data-platform/configs/prod/ep/ep_bootstrap_graviton_sh.sh"
     }
   }
 ],
 "Configurations": [
   {
     "Classification": "spark",
     "Properties": {
       "maximizeResourceAllocation": "true"
     }
   },
   {
     "Classification": "capacity-scheduler",
     "Properties": {
       "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
     }
   },
   {
     "Classification": "yarn-site",
     "Properties": {
       "yarn.nodemanager.vmem-check-enabled": "false",
       "yarn.nodemanager.pmem-check-enabled": "false"
     },
     "Configurations": [
      
     ]
   },
   {
     "Classification": "spark-defaults",
     "Properties": {
       "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35",
       "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35",
       "spark.shuffle.compress": "true",
       "spark.shuffle.spill.compress": "true"
     },
     "Configurations": [
      
     ]
   },
   {
     "Classification": "spark-env",
     "Properties": {
      
     },
     "Configurations": [
       {
         "Classification": "export",
         "Properties": {
           "PYSPARK_PYTHON": "/usr/bin/python3"
         },
         "Configurations": [
          
         ]
       }
     ]
   }
 ],
 "Applications": [
   {
     "Name": "Hadoop"
   },
   {
     "Name": "Spark"
   }
 ]
}


Spark Step: 

[
 {
   "ActionOnFailure": "TERMINATE_CLUSTER",
   "HadoopJarStep": {
     "Jar": "command-runner.jar",
     "Args": [
       "spark-submit",
       "--master",
       "yarn",
       "--deploy-mode",
       "cluster",
       "--packages",
       "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.delta:delta-core_2.11:0.6.1",
       "--conf",
       "spark.databricks.delta.optimizeWrite.enabled=true",
       "--conf",
       "spark.databricks.delta.autoCompact.enabled=true",
       "--conf",
       "spark.databricks.delta.retentionDurationCheck.enabled=false",
       "--conf",
       "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:/home/hadoop/log4j.properties",
       "--conf",
       "spark.task.maxFailures=8",
       "--conf",
       "spark.yarn.maxAppAttempts=5",
       "--conf",
       "spark.dynamicAllocation.executorIdleTimeout=600s",
       "--conf",
       "spark.streaming.dynamicAllocation.enabled=true",
       "--conf",
       "spark.serializer=org.apache.spark.serializer.KryoSerializer",
       "--conf",
       "spark.streaming.receiver.writeAheadLog.enable=true",
       "--name",
       "Event_Persistence_Spark_Job",
       "--class",
       "com.urbanclap.dp.eventpersistence.SparkJob",
       "s3://uc-emr-data/data-platform/deployment/ep/prod/latest/event-persistence-job-assembly-2.4.2.jar",
       "--env",
       "prod",
       "--job_mode",
       "main",
       "--bucket_id",
       "ep_json_daily_1"
     ]
   },
   "Name": "Event_Persistence_Spark_Submit"
 }
]

Config given through code
.config("spark.executor.memory", “18g”)
.config("spark.driver.maxResultSize", “0”)
.config("spark.executor.cores", “15”)
.config("spark.executor.memoryOverhead", “1024”)
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.master", “yarn”)

All spark configurations in spark 3.4.1 setup

Cluster setup configuration:

{
 "BootstrapActions": [
   {
     "Name": "Install EP dependencies",
     "ScriptBootstrapAction": {
       "Path": "s3://uc-emr-data/data-platform/configs/prod/ep/ep_bootstrap_graviton_sh.sh"
     }
   }
 ],
 "Configurations": [
   {
     "Classification": "spark",
     "Properties": {
       "maximizeResourceAllocation": "true"
     }
   },
   {
     "Classification": "capacity-scheduler",
     "Properties": {
       "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
     }
   },
   {
     "Classification": "yarn-site",
     "Properties": {
       "yarn.nodemanager.vmem-check-enabled": "false",
       "yarn.nodemanager.pmem-check-enabled": "false",
       "yarn.node-labels.enabled": "true",
       "yarn.node-labels.am.default-node-label-expression": "CORE"
     },
     "Configurations": []
   },
   {
     "Classification": "spark-defaults",
     "Properties": {
       "spark.executor.extraJavaOptions": "-Xss4m -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35",
       "spark.driver.extraJavaOptions": "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/spark-job-fail.log -Dlog4j.configuration=file:/home/hadoop/log4j.properties -Xss1024m -XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35",
       "spark.shuffle.compress": "true",
       "spark.shuffle.spill.compress": "true"
     },
     "Configurations": []
   },
   {
     "Classification": "spark-env",
     "Properties": {},
     "Configurations": [
       {
         "Classification": "export",
         "Properties": {
           "PYSPARK_PYTHON": "/usr/bin/python3"
         },
         "Configurations": []
       }
     ]
   }
 ],
 "Applications": [
   {
     "Name": "Hadoop"
   },
   {
     "Name": "Spark"
   }
 ]
}


Spark steps

[
 {
   "ActionOnFailure": "CANCEL_AND_WAIT",
   "HadoopJarStep": {
     "Jar": "command-runner.jar",
     "Args": [
       "spark-submit",
       "--master",
       "yarn",
       "--deploy-mode",
       "cluster",
       "--packages",
       "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
       "--conf",
       "spark.task.maxFailures=8",
       "--conf",
       "spark.yarn.maxAppAttempts=5",
       "--conf",
       "spark.streaming.dynamicAllocation.enabled=true",
       "--conf",
       "spark.serializer=org.apache.spark.serializer.KryoSerializer",
       "--conf",
       "spark.streaming.receiver.writeAheadLog.enable=true",
       "--conf",
       "spark.scheduler.maxRegisteredResourcesWaitingTime=300s",
       "--conf",
       "spark.dynamicAllocation.executorIdleTimeout=600s",
       "--name",
       "Event_Persistence_Spark_Job",
       "--class",
       "com.urbanclap.dp.eventpersistence.SparkJob",
       "s3://uc-emr-data/data-platform/tx/prod/bronze_checkpoint/sparkTest/event-persistence-job-assembly-2.5.0.jar",
       "--env",
       "prod",
       "--job_mode",
       "main",
       "--bucket_id",
       "ep_json_daily_5"
     ]
   },
   "Name": "Event_Persistence_Spark_Submit"
 }
]


.config("spark.driver.maxResultSize", “0”)
.config("spark.executor.memory", "18G")
.config("spark.executor.cores", "5")
.config("spark.executor.memoryOverhead", "1024")
.config("spark.dynamicAllocation.enabled", "true")

Tried Solutions: Disabled Adaptive Query Execution: spark.sql.adaptive.enabled = false Set driver max result size: spark.driver.maxResultSize = "1g” (Earlier we used it as 0) Increased driver cores to 2: spark.driver.cores = 2 Enabled specific optimizer rules: conf.set("spark.sql.optimizer.enabledRules", "FilterPushdown,ColumnPruning”)

Despite these changes, the driver could not schedule the tasks on the executor. However, when the driver memory was increased to 48 GB and memory overhead to 5 GB, the driver was able to schedule the tasks successfully. Are there any additional configurations or optimizations that could help mitigate this memory issue without always resorting to a larger machine? Please provide insights or recommendations on how to resolve this issue effectively.

Upvotes: 0

Views: 250

Answers (1)

parisni
parisni

Reputation: 1152

When iterating withColumn on nested fields, the spark 3 driver can run into oom. I can't help on the reason but in my case creating new plan for the dataframe after each iteration fixed the issue.

One way is to create a new df after the withColumn (pseudocode):

newdf = spark.createDataframe(olddf.rdd, olddf.schema)

Upvotes: 0

Related Questions