Reputation: 31
I have a problem with the driver process in a spark streaming application. The issue is that the driver process goes out-of-memory. There is no problem with neither the master nor the worker nodes (they all run fine for days on end). But even with a very limited feed (two messages every 5 minutes, processing of 1 message takes less than 100ms) I get oom errors in the driver process after some time (e.g. a weekend).
Here are more details:
I have a simple spark-streaming application that consumes events from an mqtt data source and stores these in a database. I'm using a small spark cluster with 1 master and 2 worker nodes. I have 1 driver process (started using spark-submit with deploy-mode client) feeding the cluster. I'm running spark-1.4.1-bin-hadoop2.6 on Ubuntu using Java8 (Oracle VM).
My driver program is basically the following:
JavaReceiverInputDStream<String> messages = createInputDStream(ssc);
messages.mapPartitions(...).mapToPair(...).updateStateByKey(...).
foreachRDD(rdd -> {
rdd.foreach(ne -> {
});
return null;
});
I already did an initial investigation. If I take a heap dump of the driver process to collect the histogram of instances (jmap -histo), I typically see stuff like this:
1: 36388 81204168 [B
2: 407486 32826432 [C
3: 40849 25067224 [I
4: 367245 8813880 scala.collection.immutable.$colon$colon
5: 311000 7464000 java.lang.String
6: 114294 7314816 org.apache.spark.storage.RDDInfo
I notice that, over time, the amount of RDDInfo objects is increasing. A heap dump shows that the bulk of the RDDINfo objects is kept inside the stageIdToData map of the JobProgressListener. Looking at the code of that class, it seems it should take care of throwing away old data. As such, I've already set
spark.ui.retainedJobs 50
spark.ui.retainedStages 50
in conf/spark-defaults.conf. But that didn't help. From my dump, I see that this stageIdToData map contains 1897 entries. This looks strange to me given the above configuration settings.
Am I doing something wrong here or is this a spark issue?
Upvotes: 3
Views: 2893
Reputation: 5712
Since you're using updateStateByKey
, you must be checkpointing your streams. Try decreasing the checkpoint interval.
In your memory snapshot you have a bunch of arrays. Who is holding on to those?
Upvotes: 2
Reputation: 324
Are you doing anything on the driver side of the application? Like getting data and processing it, keeping it in memory?
Upvotes: 0