Reputation: 3868
I have a spark jobs that runs on yarn, it works with about 150gb of dataset and does multiple shuffle operations and finally stores data into hbase. It keeps failing at saveAsHadoopDataset
Basically multiple Executors fails at this stage after reporting high GC activities. However none of the executor logs, driver logs or node manager logs indicate any OutOfMemory errors or GC Overhead Exceeded errors or memory limits exceeded errors. I don't see any other reason for Executor failures as well in spark ui as well.
val hConf = HBaseConfiguration.create
hConf.setInt("hbase.client.scanner.caching", 10000)
hConf.setBoolean("hbase.cluster.distributed", true)
new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)
Driver Logs:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
Executor Logs:
16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0 (TID 15318). 2099 bytes result sent to driver
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15333
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0 (TID 15333)
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125 non-empty blocks out of 3007 blocks
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 remote fetches in 10 ms
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to maprnode5 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from maprnode5 is closed
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from maprnode5 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:744)
16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 6 outstanding blocks after 5000 ms
16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive connection to maprnode5, creating a new one.
16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in connection from maprnode5
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:744)
16/02/24 11:12:16 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from maprnode5 is closed
16/02/24 11:12:16 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches
Upvotes: 1
Views: 4748
Reputation: 3868
So it turns out although spark UI says it failed at saveAsHadoopDataSet
it was in fact failing at first step of the stage where saveAsHadoopDataSet
was the last step. To elaborate more, spark defines stage boundaries based on sequence of narrow transformation or sequence of combined wide transformation and narrow transformation. In my particular case, sequence was groupByKey(wide dep) -> mapValues(narrow dep) -> map(narrow dep)
where last map is actually doing saveAsHadoopDataSet
. Executor was reporting hight GC activity and memory usage at in fact shuffle stage groupByKey
. I changed my application logic to use reduceByKey
instead of groupByKey
. Now its super slow but at least not failing.
Upvotes: 1