Fanooos
Fanooos

Reputation: 2828

Spark Streaming job get killed after running for about 1 hour

I have a spark streaming job that read tweets stream from gnip and write it to Kafak.

Spark and kafka are running on the same cluster.

My cluster consists of 5 nodes. Kafka-b01 ... Kafka-b05

Spark master is running on Kafak-b05.

Here is how we submit the spark job

nohup sh $SPZRK_HOME/bin/spark-submit --total-executor-cores 5 --class com.test.java.gnipStreaming.GnipSparkStreamer --master spark://kafka-b05:7077 GnipStreamContainer.jar powertrack kafka-b01,kafka-b02,kafka-b03,kafka-b04,kafka-b05 gnip_live_stream 2 &

After about 1 hour the spark job get killed

The logs in the nohub file shows the following exception

org.apache.spark.storage.BlockFetchException: Failed to fetch block from 2 locations. Most recent failure cause: 
        at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:595) 
        at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585) 
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
        at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585) 
        at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:570) 
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:630) 
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:48) 
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
        at org.apache.spark.scheduler.Task.run(Task.scala:89) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
        at java.lang.Thread.run(Thread.java:745) 
Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioSocketChannel 
        at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:455) 
        at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306) 
        at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:134) 
        at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:116) 
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:211) 
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167) 
        at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90) 
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 
        at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) 
        at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:99) 
        at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89) 
        at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588) 
        ... 15 more 
Caused by: io.netty.channel.ChannelException: Failed to open a socket. 
        at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:62) 
        at io.netty.channel.socket.nio.NioSocketChannel.<init>(NioSocketChannel.java:72) 
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
        at java.lang.Class.newInstance(Class.java:442) 
        at io.netty.bootstrap.AbstractBootstrap$BootstrapChannelFactory.newChannel(AbstractBootstrap.java:453) 
        ... 26 more 
Caused by: java.net.SocketException: Too many open files 
        at sun.nio.ch.Net.socket0(Native Method) 
        at sun.nio.ch.Net.socket(Net.java:411) 
        at sun.nio.ch.Net.socket(Net.java:404) 
        at sun.nio.ch.SocketChannelImpl.<init>(SocketChannelImpl.java:105) 
        at sun.nio.ch.SelectorProviderImpl.openSocketChannel(SelectorProviderImpl.java:60) 
        at io.netty.channel.socket.nio.NioSocketChannel.newSocket(NioSocketChannel.java:60) 
        ... 33 more

I have increased the maximum number of open files to 3275782 (the old number was almost half of this number) but I am still facing the same issue.

When I checked the stderr logs of the workers from spark web interface I found another exception.

java.nio.channels.ClosedChannelException 
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) 
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74) 
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119) 
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
        at kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) 
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:188) 
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:152) 
        at kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) 
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
        at kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:151) 
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) 
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) 
        at kafka.producer.Producer.send(Producer.scala:77) 
        at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
        at com.test.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:59) 
        at com.test.java.gnipStreaming.GnipSparkStreamer$1$1.call(GnipSparkStreamer.java:51) 
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) 
        at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) 
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
        at org.apache.spark.scheduler.Task.run(Task.scala:89) 
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
        at java.lang.Thread.run(Thread.java:745)

The second exception (as it seems) is related to Kafka not spark.

What do you think the problem is?

EDIT

based on a comment from Yuval Itzchakov Here is the code of the streamer

The main class http://pastebin.com/EcbnQQ3a

The customer receiver class http://pastebin.com/3UFPktKR

Upvotes: 3

Views: 2067

Answers (1)

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149538

The problem is that you're instantiating a new instance of Producer on the iteration of DStream.foreachPartition. In case you have a data intensive stream, this can cause a-lot of producers to be allocated and attempt to connect to Kafka.

The first thing I'd make sure is that you're properly closing the stream once you're done sending the data using a finally block and calling producer.close:

public void call(JavaRDD<String> rdd) throws Exception {
    rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
       
        @Override
        public void call(Iterator<String> itr) throws Exception {
            try {
                Producer<String, String> producer = getProducer(hosts);
                while(itr.hasNext()) {
                    try {
                        KeyedMessage<String, String> message = 
                        new KeyedMessage<String, String>(topic, itr.next());
                        producer.send(message);
                    } catch (Exception e) {
                          e.printStackTrace();
                    }
            } finally {
                  producer.close()
            }
        }
    });
    return null;
}

If that still doesn't work and you're seeing too many connections, I'd create an object pool for Kafka producers which you can pool for on demand. That way, you explicitly control the number of available producers in use and the number of sockets you open.

Upvotes: 3

Related Questions