Reputation: 331
I have an script to process a couple hundred GB of data and I've been having troubles when I try to process above 500gb, below this everything it's working fine. Debugging the app first I got the error about getting over the limit of the spark.driver.maxResultSize value, so I increased this value to 4g and the task that was failing now is working, BUT, now I have another problem, when I try to save the results into a parquet file, the task fails and throwing this error
17/01/27 06:35:27 INFO DAGScheduler: Job 7 failed: parquet at NativeMethodAccessorImpl.java:-2, took 12.106390 s
17/01/27 06:35:27 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 146:0 was 765207245 bytes, which exceeds max allowed: spark.akka.frameSize (134217728 bytes) - reserved (204800 bytes). Consider increasing spark.akka.frameSize
So, seems I need to increase the spark.akka.frameSize value
My question is, I'm already increasing maxResultSize using the function sparkConf().set, but I don't know how (or the syntax) to increase both values in the sparkConf().set.
This is how my code in those parts look:
conf = (SparkConf().set("spark.driver.maxResultSize", "4g"))
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
And the task that's failing:
sqlContext.sql(sql).coalesce(5).write.parquet(sys.argv[3], mode='overwrite')
Just one thing, I can't modify the conf files in the spark cluster and also, we use luigi to submit the task to spark, so I can't modify the spark-submit string at the moment of the execution of the script (that's why I'm modifying the parameters directly from the script)
any guidance it's appreciated.
Upvotes: 1
Views: 2546
Reputation: 9067
RTFM - straight from the Spark 1.6.3 Python API documentation...
class pyspark.SparkConf (...)
All setter methods in this class support chaining.
For example, you can writeconf.setMaster"local").setAppName("My app")
Upvotes: 1