Reputation: 611
New at Spark Jobs and I have the following problem.
When I run a count on any of the newly joined dataframes, the job runs for ages and spills memory to disk. Is there any logic error in here?
// pass spark configuration
val conf = new SparkConf()
.setMaster(threadMaster)
.setAppName(appName)
// Create a new spark context
val sc = new SparkContext(conf)
// Specify a SQL context and pass in the spark context we created
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
val dfSentRaw = sqlContext.read.parquet(inputPathSent)
val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
val dfFailedRaw = sqlContext.read.parquet(inputPathFailed)
// Rename the columns to avoid ambiguity when accessing the fields later
val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
.withColumnRenamed("campaign_id", "sent__campaign_id")
.withColumnRenamed("ced_email", "sent__ced_email")
.withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
.withColumnRenamed("riid", "sent__riid")
val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
.withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")
// LEFT Join with CLICKED on two fields, customer_id and campaign_id
val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
&& dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
dfSentClicked.count() //THIS WILL NOT WORK
val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
&& dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")
Why cant these two/three dataframes be counted anymore? Did I mess up some indexing by renaming?
Thank you!
Upvotes: 1
Views: 909
Reputation: 2885
That count
call is the only actual materialization of your Spark job here, so it's not really count
that is a problem but the shuffle that is being done for the join
right before it. You don't have enough memory to do the join without spilling to disk. Spilling to disk in a shuffle is a very easy way to make your Spark jobs take forever =).
One thing that really helps prevent spilling with shuffles is having more partitions. Then there is less data moving through the shuffles at any given time. You can set spark.sql.shuffle.partitions
which controls the number of partitions used by Spark Sql in a join or aggregation. It defaults to 200, so you can trying a higher setting. http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
You could increase the heap size of your local Spark allocation and/or increase the fraction of memory usable for shuffles by increasing spark.shuffle.memoryFraction
(defaults to 0.4) and decreasing spark.storage.memoryFraction
(defaults to 0.6). The Storage fraction is used for example when you make a .cache
call and you might not care about that.
If you are so inclined to absolutely avoid the spills outright, you can turn off spilling by setting spark.shuffle.spill
to false
. I believe this will throw an exception if you run out of memory and need to spill instead of silently taking forever and could help you configure your memory allocation faster.
Upvotes: 1