Reputation: 3316
I have a Spark program with calculates relations between users, i.e. it receives data set of type:
RDD[(java.lang.Long, Map[(String, String), Integer])]
Where the Long is timestamp, and the map is a score relevant to tuples of two users. and should run some function over the scores and return the following type:
Map[String, Map[java.lang.Long, java.lang.Double]]
Where the String is the first String in the tuple, and the map is the results of the function per timeslot.
In my case I have around 2000 users so the maps I receive are quite big (2000^2 per timeslot), and also the results relies on the previous timeslot results.
I am running the program locally and receiving GC overhead limit exceeded
. I increased the heap memory to 14g using: -Xmx14G
in vmarguments (I see the java process is occupying more than 12g of memory) but it didn't help.
Currently implemented method
I have tried several directions to decrease the memory consumption and currently came up with the following idea: since every timestamp relies only on the previous one I will collect every timeslot separately and keep the previous results on driver. In this manner I will run calculations only on part of the data and hopefully it will not crush the program.
The code:
def calculateScorePerTimeslot(scorePerTimeslotRDD: RDD[(java.lang.Long, Map[(String, String), Integer])]): Map[String, Map[java.lang.Long, java.lang.Double]] = {
var distancesPerTimeslotVarRDD = distancesPerTimeslotRDD.groupBy(_._1).sortBy(_._1)
println("Start collecting all the results - cache the data!!")
distancesPerTimeslotVarRDD.cache()
println("Caching all the data has completed!")
while(!distancesPerTimeslotVarRDD.isEmpty())
{
val dataForTimeslot: (java.lang.Long, Iterable[(java.lang.Long, Map[(String, String), Integer])]) = distancesPerTimeslotVarRDD.first()
println("Retrieved data for timeslot: " + dataForTimeslot._1)
//Code which is irrelevant for question - logic
println("Removing timeslot: " + dataForTimeslot._1)
distancesPerTimeslotVarRDD = distancesPerTimeslotVarRDD.filter(t => !t._1.equals(dataForTimeslot._1))
println("Filtering has complete! - without: " + dataForTimeslot._1)
}
}
Summary: Basically, the idea is to extract one timeslot at a time process it and save the results at driver - in this manner I try to reduce the size of data which passes on collect
.
Reason I write this post
Unfortunately, this doesn't help me and the program still dies. My question is: is this manner of taking the first() item of a RDD and then filter it have the effect of iterating over the items on RDD? Are there other better ideas to tackle this kinds of question (better ideas which are not increasing the memory or moving to a real distributed cluster)?
Upvotes: 0
Views: 2058
Reputation: 11383
Firstly, RDD[(java.lang.Long, Map[(String, String), Integer])]
uses more memory than RDD[(java.lang.Long, Array[(String, String, Integer)])]
. You'll save some memory if you can use the latter.
Secondly, your loop is pretty inefficient in caching data. Always call unpersist
on any RDD you no longer need.
distancesPerTimeslotVarRDD.cache()
var rddSize = distancesPerTimeslotVarRDD.count()
println("Caching all the data has completed!")
while(rddSize > 0) {
val prevRDD = distancesPerTimeslotVarRDD
val dataForTimeslot = distancesPerTimeslotVarRDD.first()
println("Retrieved data for timeslot: " + dataForTimeslot._1)
// Code which is irrelevant for answer - logic
println("Removing timeslot: " + dataForTimeslot._1)
// Cache the new value of distancesPerTimeslotVarRDD
distancesPerTimeslotVarRDD = distancesPerTimeslotVarRDD.filter(t => !t._1.equals(dataForTimeslot._1)).cache()
// Force calculation so we can throw away previous iteration value
rddSize = distancesPerTimeslotVarRDD.count()
println("Filtering has complete! - without: " + dataForTimeslot._1)
// Get rid of previously cached RDD
prevRDD.unpersist(false)
}
Thirdly, you can try using Kryo Serializer, though this sometimes makes things worse. You have to configure the serializer and replace cache
with persist(StorageLevel.MEMORY_ONLY_SER)
Upvotes: 1