teirce
teirce

Reputation: 11

Spark Memory Usage Concentrated on Driver / Master

I'm currently developing a Spark (v 2.2.0) Streaming application and am running into issues with the way Spark seems to be allocating work across the cluster. This application is submitted to AWS EMR using client mode, so there is a driver node and a couple of worker nodes. Here is a screenshot of Ganglia that shows memory usage in the last hour:

enter image description here

The left-most node is the "master" or "driver" node, and the other two are worker nodes. There are spikes in the memory usage for all three nodes that correspond to workloads coming in through the stream, but the spikes are not equal (even when scaled to % memory usage). When a large workload comes in, the driver node appears to be overworked, and the job will crash with an error regarding memory:

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x000000053e980000, 674234368, 0) failed; error='Cannot allocate memory' (errno=12)

I've also run into this: Exception in thread "streaming-job-executor-10" java.lang.OutOfMemoryError: Java heap space when the master runs out of memory, which is equally confusing, as my understanding is that "client" mode would not use the driver / master node as an executor.

Pertinent details:

I'm certainly at a loss here. I'm not sure what is going on in the code to be triggering the driver to hog the workload like this.

The only suspect I can think of is a code snippet similar to the following:

      val scoringAlgorithm = HelperFunctions.scoring(_: Row, batchTime)
      val rawScored = dataToScore.map(scoringAlgorithm)

Here, a function is being loaded from a static object, and used to map over the Dataset. It is my understanding that Spark will serialize this function across the cluster (re: http://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#passing-functions-to-spark). However perhaps I am mistaken and it is simply running this transformation on the driver.

If anyone has any insight to this issue, I would love to hear it!

Upvotes: 1

Views: 1844

Answers (1)

teirce
teirce

Reputation: 11

I ended up solving this issue. Here's how I addressed it:

I made an incorrect assertion in stating the problem: there was a collect statement at the beginning of the Spark program.

I had a transaction that required collect() to run as it was designed. My assumption was that calling repartition(n) on the resulting data would split the data back amongst the executors in the cluster. From what I can tell, this strategy does not work. Once I re-wrote this line, Spark started behaving as I expected and farming jobs out to worker nodes.

My advice to any lost soul who stumbles across this issue: don't collect unless it's the end of your Spark program. You can not recover from it. Find another way to perform your task. (I ended up switching a SQL transaction from where col in (,,,) syntax to a join on the database.)

Upvotes: 0

Related Questions