Reputation: 1146
I have an issue related to datastax spark-Cassandra-connector. When I am trying to test our spark-Cassandra connections, I use bellow code. My problem is this code throw an exception after some time like half an hour. I think there is some connection issue, can anybody help, I am stuck.
SparkConf conf = new SparkConf(true)
.setMaster("local")
.set("spark.cassandra.connection.host",
Config.CASSANDRA_CONTACT_POINT)
.setAppName(Config.CASSANDRA_DB_NAME)
.set("spark.executor.memory",
Config.Spark_Executor_Memory);
SparkContext javaSparkContext = new SparkContext(conf);
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(javaSparkContext);
for(;;){
JavaRDD<ObjectHandler> obj = functions.cassandraTable(Config.CASSANDRA_DB_NAME,
"my_users", ObjectHandler.class);
System.out.println("#####" + obj.count() + "#####");
}
Error:
java.lang.OutOfMemoryError: Java heap space
at org.jboss.netty.buffer.HeapChannelBuffer.slice(HeapChannelBuffer.java:201)
at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323)
at com.datastax.driver.core.CBUtil.readValue(CBUtil.java:247)
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:395)
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:383)
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:201)
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:198)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:182)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
19:11:12.311 DEBUG [New I/O worker #1612][com.datastax.driver.core.Connection] Defuncting connection to /192.168.1.26:9042
com.datastax.driver.core.TransportException: [/192.168.1.26:9042] Unexpected exception triggered (java.lang.OutOfMemoryError: Java heap space)
at com.datastax.driver.core.Connection$Dispatcher.exceptionCaught(Connection.java:614)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
at org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:566)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.jboss.netty.buffer.HeapChannelBuffer.slice(HeapChannelBuffer.java:201)
at org.jboss.netty.buffer.AbstractChannelBuffer.readSlice(AbstractChannelBuffer.java:323)
at com.datastax.driver.core.CBUtil.readValue(CBUtil.java:247)
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:395)
at com.datastax.driver.core.Responses$Result$Rows$1.decode(Responses.java:383)
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:201)
at com.datastax.driver.core.Responses$Result$2.decode(Responses.java:198)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:182)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:66)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:310)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
... 3 more
19:11:13.549 DEBUG [New I/O worker #1612][com.datastax.driver.core.Connection] [/192.168.1.26:9042-1] closing connection
19:11:12.311 DEBUG [main][com.datastax.driver.core.ControlConnection] [Control connection] error on /192.168.1.26:9042 connection, no more host to try
com.datastax.driver.core.ConnectionException: [/192.168.1.26:9042] Operation timed out
at com.datastax.driver.core.DefaultResultSetFuture.onTimeout(DefaultResultSetFuture.java:138)
at com.datastax.driver.core.Connection$ResponseHandler$1.run(Connection.java:763)
at org.jboss.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:546)
at org.jboss.netty.util.HashedWheelTimer$Worker.notifyExpiredTimeouts(HashedWheelTimer.java:446)
at org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:395)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at java.lang.Thread.run(Thread.java:722)
19:11:13.551 DEBUG [main][com.datastax.driver.core.Cluster] Shutting down
Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.1.26:9042 (com.datastax.driver.core.ConnectionException: [/192.168.1.26:9042] Operation timed out))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:195)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1143)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:166)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:72)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:108)
at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:131)
at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:206)
at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:205)
at com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:212)
at com.datastax.spark.connector.SparkContextFunctions.cassandraTable(SparkContextFunctions.scala:48)
at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:47)
at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:89)
at com.datastax.spark.connector.SparkContextJavaFunctions.cassandraTable(SparkContextJavaFunctions.java:140)
at com.shephertz.app42.paas.spark.SegmentationWorker.main(SegmentationWorker.java:52)
Upvotes: 0
Views: 794
Reputation: 3190
Your error probably lies in how the JVM is configured. If the settings are not correctly tuned, garbage collection could be causing some issues. If you are using Cassandra > 2.0 see Datastax's "Tuning Java Resources"
How Cassandra uses memory from the document:
Using a java-based system like Cassandra, you can typically allocate about 8GB of memory on the heap before garbage collection pause time starts to become a problem. Modern machines have much more memory than that and Cassandra can make use of additional memory as page cache when files on disk are accessed. Allocating more than 8GB of memory on the heap poses a problem due to the amount of Cassandra metadata about data on disk. The Cassandra metadata resides in memory and is proportional to total data. Some of the components grow proportionally to the size of total memory.
In Cassandra 1.2 and later, the Bloom filter and compression offset map that store this metadata reside off-heap, greatly increasing the capacity per node of data that Cassandra can handle efficiently. In Cassandra 2.0, the partition summary also resides off-heap.
Please post your JVM options for further help.
Upvotes: 0
Reputation: 11638
It looks like you ran out of heap space:
java.lang.OutOfMemoryError: Java heap space
The java-driver (what the spark-connector uses for interacting with cassandra) defuncted a connection because an OutOfMemoryError was thrown while processing a request. When a connection is defuncted, its host is brought down.
The NoHostAvailableException is likely being raised because all of your hosts were brought down because their connections were defuncted, likely because of OutOfMemoryError.
Do you know why you may be getting an OutOfMemoryError? What is your heap size? Are you doing anything that would cause a lot of objects to be on heap in your JVM? Do you possibly have a memory leak?
Upvotes: 1