Reputation: 3868
I am working with several hundred GB dataset (around 2B rows). One of the operation is to reduce RDD or scala case objects(containing doubles, maps, sets) into single entity. Initially my operation was performing groupByKey
but it was slow and was doing high GC. so I tried convert it to aggregateByKey
and later even into reduceByKey
in a hope to avoid high user memory allocations, shuffle activity and high gc issue that I was encountering with groupBy.
Application resources: 23GB exec mem + 4GB overhead. 20 instances and 6 cores each. Played with shuffle ration from 0.2 to 0.4
Available cluster resources 10 nodes, 600GB total for yarn, 32GB max container size
2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to hdn2.mycorp:45993
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on 10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
2016-05-02 22:39:03,704 WARN [New I/O worker #5] org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xa8147f0c, /10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION: java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-14] org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-05-02 22:39:05,783 ERROR [sparkDriver-akka.actor.default-dispatcher-2] akka.actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
67247,1 99%
About a Job Read input data set having around 20 fields. 1B-2B. Create an output dataset aggregating over 10 unique fields. Which becomes query criteria basically. However out of those 10, 3 fields represent various combinations of them so that we don't have to query multiple records to get a set. out of those 3 fields lets sat a, b, and c each has 11, 2 and 2 possible values. so we could get max of 2^11 -1 * 2^2 - 1 * 2^2 -1 combination for given key.
//pseudo code where I use aggregateByKey
case class UserDataSet(salary: Double, members: Int, clicks: Map[Int, Long],
businesses: Map[Int, Set[Int]])...) //About 10 fileds with 5 of them are maps
def main() = {
create combinationRDD of type (String, Set[Set]) Rdd from input dataset which represent all combination
create a joinedRdd of type (String, UserDataSet) - where key at this point already a final key which contains 10 unique fields; value is a UserDataSet
//This is where things fails
val finalDataSet = joinedRdd.aggregateByKey(UserDataSet.getInstance())(processDataSeq, processDataMerge)
}
private def processDataMerge(map1: UserDataSet, map2: UserDataSet) = {
map1.clicks ++= map2.clicks (deep merge of course to avoid overwriting of map keys)
map1.salary += map2.salary
map1
}
Upvotes: 1
Views: 1166
Reputation: 3868
So issue was indeed driver running out of memory and not executor. hence error was in driver logs. duh. however it wasn't very clear from logs. Driver ran out because 1) It was using default of -Xmx900m 2) Spark driver relies on akka libs and akka libs relies on stubborn JavaSerializer which uses Byte array instead of stream to serialize objects. As a temp solution I increased spark.driver.memory to 4096m in my case and I haven't seen memory error since. Thanks everyone for some insights into a problem space though.
Upvotes: 2
Reputation: 5213
To be able to help, you should post the code and also give an explanation of the input data.
Why the data ? When aggregating by key, to achieve optimal parallelism and avoid issues, it's important to have an idea of what the key distribution looks like and also the cardinality.
Let me explain what they are and why they are important. Let's say you're aggregating by country...there are about 250 countries on earth, so the cardinality of the key is around 250.
Cardinality is important because low cardinality may stifle your parallelism. For instance, if 90% of your data is for the US, and you have 250 nodes, one node will be processing 90% of the data.
That leads to the concept of distribution, that is, when you're grouping by key, how many values you have per key is your value distribution. For optimal parallelism, you ideally want roughly the same number of values for every key.
Now, if the cardinality of your data is very high, but the value distribution is not optimal, statistically things should even out. For instance, let's say you have apache logs, where most users only visit a few pages, but some visit many (as it's the case with robots). If the number of users is much greater than the number of your nodes, the users with lots of data get distributed around the nodes so parallelism is not that impacted.
Problems usually arise when you use keys with low cardinality. If the distribution of the values is not good, it causes issues not unlikely an unbalanced washing machines.
Last but not least, it also depends greatly on what you're doing on the aggregateByKey. You can exhaust memory easily if you're leaking objects in either the map or reduce phase of processing.
Upvotes: 1