Shashank Shukla
Shashank Shukla

Reputation: 1146

Spark cassandra connector connection error , no more host to try

I have an issue related to datastax spark-Cassandra-connector. When I am trying to test our spark-Cassandra connections, I use bellow code. My problem is this code throw an exception after some time like half an hour. I think there is some connection issue, can anybody help, I am stuck.

    SparkConf conf = new SparkConf(true)
    .setMaster("local")
    .set("spark.cassandra.connection.host",
            Config.CASSANDRA_CONTACT_POINT)
    .setAppName(Config.CASSANDRA_DB_NAME)
    .set("spark.executor.memory",
            Config.Spark_Executor_Memory);
    SparkContext javaSparkContext = new SparkContext(conf);
    SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(javaSparkContext);

    for(;;){
    JavaRDD<ObjectHandler> obj = functions.cassandraTable(Config.CASSANDRA_DB_NAME,
            "my_users", ObjectHandler.class);
     System.out.println("#####" + obj.count() + "#####");
    }

Error:

java.lang.OutOfMemoryError: Java heap space
at org.jboss.netty.buffer.HeapChannelBuffer.slice(HeapChannelBuffer.java:201)
at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323)
at com.datastax.driver.core.CBUtil.readValue(CBUtil.java:247)
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:395)
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:383)
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:201)
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:198)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:182)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
19:11:12.311 DEBUG [New I/O worker #1612][com.datastax.driver.core.Connection] Defuncting connection to /192.168.1.26:9042
com.datastax.driver.core.TransportException: [/192.168.1.26:9042] Unexpected exception triggered (java.lang.OutOfMemoryError: Java heap space)
    at com.datastax.driver.core.Connection$Dispatcher.exceptionCaught(Connection.java:614)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
    at org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:566)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at org.jboss.netty.buffer.HeapChannelBuffer.slice(HeapChannelBuffer.java:201)
    at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323)
    at com.datastax.driver.core.CBUtil.readValue(CBUtil.java:247)
    at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:395)
    at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:383)
    at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:201)
    at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:198)
    at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:182)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    ... 3 more
19:11:13.549 DEBUG [New I/O worker #1612][com.datastax.driver.core.Connection] [/192.168.1.26:9042-1] closing connection
19:11:12.311 DEBUG [main][com.datastax.driver.core.ControlConnection] [Control connection] error on /192.168.1.26:9042 connection, no more host to try
com.datastax.driver.core.ConnectionException: [/192.168.1.26:9042] Operation timed out
    at com.datastax.driver.core.DefaultResultSetFuture.onTimeout(DefaultResultSetFuture.java:138)
    at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:763)
    at org.jboss.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:546)
    at org.jboss.netty.util.HashedWheelTimer$Worker.notifyExpiredTimeouts(HashedWheelTimer.java:446)
    at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:395)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at java.lang.Thread.run(Thread.java:722)
19:11:13.551 DEBUG [main][com.datastax.driver.core.Cluster] Shutting down
Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.26:9042 (com.datastax.driver.core.ConnectionException: [/192.168.1.26:9042] Operation timed out))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:195)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1143)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:166)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:72)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:131)
    at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:206)
    at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:205)
    at com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:212)
    at com.datastax.spark.connector.SparkContextFunctions.cassandraTable(SparkContextFunctions.scala:48)
    at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:47)
    at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:89)
    at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:140)
    at com.shephertz.app42.paas.spark.SegmentationWorker.main(SegmentationWorker.java:52)

Upvotes: 0

Views: 794

Answers (2)

Nathan
Nathan

Reputation: 3190

Your error probably lies in how the JVM is configured. If the settings are not correctly tuned, garbage collection could be causing some issues. If you are using Cassandra > 2.0 see Datastax's "Tuning Java Resources"

How Cassandra uses memory from the document:

Using a java-based system like Cassandra, you can typically allocate about 8GB of memory on the heap before garbage collection pause time starts to become a problem. Modern machines have much more memory than that and Cassandra can make use of additional memory as page cache when files on disk are accessed. Allocating more than 8GB of memory on the heap poses a problem due to the amount of Cassandra metadata about data on disk. The Cassandra metadata resides in memory and is proportional to total data. Some of the components grow proportionally to the size of total memory.

In Cassandra 1.2 and later, the Bloom filter and compression offset map that store this metadata reside off-heap, greatly increasing the capacity per node of data that Cassandra can handle efficiently. In Cassandra 2.0, the partition summary also resides off-heap.

Please post your JVM options for further help.

Upvotes: 0

Andy Tolbert
Andy Tolbert

Reputation: 11638

It looks like you ran out of heap space:

java.lang.OutOfMemoryError: Java heap space

The java-driver (what the spark-connector uses for interacting with cassandra) defuncted a connection because an OutOfMemoryError was thrown while processing a request. When a connection is defuncted, its host is brought down.

The NoHostAvailableException is likely being raised because all of your hosts were brought down because their connections were defuncted, likely because of OutOfMemoryError.

Do you know why you may be getting an OutOfMemoryError? What is your heap size? Are you doing anything that would cause a lot of objects to be on heap in your JVM? Do you possibly have a memory leak?

Upvotes: 1

Related Questions