Reputation: 840
I have a spark job which runs sql joins.
I visualized the DAG and it creates +5 stages every join . Anyhow after the stage where DAG has around 40 stages, the next step always fails with exception i.e after 8 iterations with 5 stages each.
Exception in thread "main" java.lang.OutOfMemoryError at java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161) at java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:125) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at java.lang.StringBuilder.append(StringBuilder.java:136) at java.lang.StringBuilder.append(StringBuilder.java:131) at scala.StringContext.standardInterpolator(StringContext.scala:125) at scala.StringContext.s(StringContext.scala:95) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:230) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392) at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2420) at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2419) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801) at org.apache.spark.sql.Dataset.count(Dataset.scala:2419) at com.samsung.cloud.mopay.linking.controller.PostNotificLinkController.linkPostNotific(PostNotificLinkController.java:51) at com.samsung.cloud.mopay.linking.txn.TxnLinking.performTxnLinking(TxnLinking.java:26) at com.samsung.cloud.mopay.linking.Linking.processData(Linking.java:199) at com.samsung.cloud.mopay.linking.Linking.main(Linking.java:72) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I am running spark with
--conf spark.driver.memory=30g
--conf spark.yarn.driver.memoryOverhead=15g
--conf spark.executor.memory=10g
--conf spark.yarn.executor.memoryOverhead=6g
--conf spark.executor.cores=5
3 instances per node (r3.2xlarge) = >12 executor instances
Upvotes: 2
Views: 1444
Reputation: 495
The root issue is you are applying many transformations on a dataframe all along and not persisting or destroying its lineage.
Why Lineage/DAG? Spark uses these lineages to ensure its fault-tolerant. There is a trade-off between how much fault-tolerance do you need depending on the application and the Dataframe to be built.
Ways to avoid it:
After a few expensive transformations you can:
Upvotes: 1
Reputation: 840
The solution was to persist the data after a few stages. This will start a new execution plan and will not add to the existing plan making it bigger and bigger and run out of memory.
Upvotes: 2
Reputation: 1464
I can give you a couple ideas. Without knowing more about your datasets (size and basic statistics) and basic spark configs (parallelism)... these are just my top guesses:
How big are your datasets, how big are your partitions / how many partitions? Try to use more/smaller partitions - what's the default spark.default.parallelism / spark.sql.shuffle.partitions?
Maybe there's a hot spot in your dataset? Are there keys with a lot of records in any of the joined datasets? You need to analyze your data and collect some stats on all operands involved and top frequent join keys in each (the join may multiply your data).
If you can provide more details then I'll give you more educated guesses.
Upvotes: -1