user2519865
user2519865

Reputation: 31

spark streaming driver process goes out-of-memory

I have a problem with the driver process in a spark streaming application. The issue is that the driver process goes out-of-memory. There is no problem with neither the master nor the worker nodes (they all run fine for days on end). But even with a very limited feed (two messages every 5 minutes, processing of 1 message takes less than 100ms) I get oom errors in the driver process after some time (e.g. a weekend).

Here are more details:

I have a simple spark-streaming application that consumes events from an mqtt data source and stores these in a database. I'm using a small spark cluster with 1 master and 2 worker nodes. I have 1 driver process (started using spark-submit with deploy-mode client) feeding the cluster. I'm running spark-1.4.1-bin-hadoop2.6 on Ubuntu using Java8 (Oracle VM).

My driver program is basically the following:

JavaReceiverInputDStream<String> messages = createInputDStream(ssc);
messages.mapPartitions(...).mapToPair(...).updateStateByKey(...).
  foreachRDD(rdd -> {
    rdd.foreach(ne -> {
    });
    return null;
  });

I already did an initial investigation. If I take a heap dump of the driver process to collect the histogram of instances (jmap -histo), I typically see stuff like this:

   1:         36388       81204168  [B
   2:        407486       32826432  [C
   3:         40849       25067224  [I
   4:        367245        8813880  scala.collection.immutable.$colon$colon
   5:        311000        7464000  java.lang.String
   6:        114294        7314816  org.apache.spark.storage.RDDInfo

I notice that, over time, the amount of RDDInfo objects is increasing. A heap dump shows that the bulk of the RDDINfo objects is kept inside the stageIdToData map of the JobProgressListener. Looking at the code of that class, it seems it should take care of throwing away old data. As such, I've already set

spark.ui.retainedJobs      50
spark.ui.retainedStages    50

in conf/spark-defaults.conf. But that didn't help. From my dump, I see that this stageIdToData map contains 1897 entries. This looks strange to me given the above configuration settings.

Am I doing something wrong here or is this a spark issue?

Upvotes: 3

Views: 2893

Answers (2)

Iulian Dragos
Iulian Dragos

Reputation: 5712

Since you're using updateStateByKey, you must be checkpointing your streams. Try decreasing the checkpoint interval.

In your memory snapshot you have a bunch of arrays. Who is holding on to those?

Upvotes: 2

Fabien COMTE
Fabien COMTE

Reputation: 324

Are you doing anything on the driver side of the application? Like getting data and processing it, keeping it in memory?

Upvotes: 0

Related Questions