ZedBrannigan
ZedBrannigan

Reputation: 611

Spark doesnt let me count my joined dataframes

New at Spark Jobs and I have the following problem.

When I run a count on any of the newly joined dataframes, the job runs for ages and spills memory to disk. Is there any logic error in here?

    // pass spark configuration
    val conf = new SparkConf()
      .setMaster(threadMaster)
      .setAppName(appName)

    // Create a new spark context
    val sc = new SparkContext(conf)

    // Specify a SQL context and pass in the spark context we created
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)


    // Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
    val dfSentRaw = sqlContext.read.parquet(inputPathSent)
    val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
    val dfFailedRaw  = sqlContext.read.parquet(inputPathFailed)



    // Rename the columns to avoid ambiguity when accessing the fields later
    val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
      .withColumnRenamed("campaign_id", "sent__campaign_id")
      .withColumnRenamed("ced_email", "sent__ced_email")
      .withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
      .withColumnRenamed("riid", "sent__riid")


    val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
      .withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
    val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")


    // LEFT Join with CLICKED on two fields, customer_id and campaign_id
    val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
      && dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
     dfSentClicked.count() //THIS WILL NOT WORK

val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
      && dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")

Why cant these two/three dataframes be counted anymore? Did I mess up some indexing by renaming?

Thank you!

enter image description here

Upvotes: 1

Views: 909

Answers (1)

Rich
Rich

Reputation: 2885

That count call is the only actual materialization of your Spark job here, so it's not really count that is a problem but the shuffle that is being done for the join right before it. You don't have enough memory to do the join without spilling to disk. Spilling to disk in a shuffle is a very easy way to make your Spark jobs take forever =).

One thing that really helps prevent spilling with shuffles is having more partitions. Then there is less data moving through the shuffles at any given time. You can set spark.sql.shuffle.partitions which controls the number of partitions used by Spark Sql in a join or aggregation. It defaults to 200, so you can trying a higher setting. http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

You could increase the heap size of your local Spark allocation and/or increase the fraction of memory usable for shuffles by increasing spark.shuffle.memoryFraction (defaults to 0.4) and decreasing spark.storage.memoryFraction (defaults to 0.6). The Storage fraction is used for example when you make a .cache call and you might not care about that.

If you are so inclined to absolutely avoid the spills outright, you can turn off spilling by setting spark.shuffle.spill to false. I believe this will throw an exception if you run out of memory and need to spill instead of silently taking forever and could help you configure your memory allocation faster.

Upvotes: 1

Related Questions