HackCode
HackCode

Reputation: 1847

Cannot connect to cassandra from Spark

I have some test data in my cassandra. I am trying to fetch this data from spark but I get an error like :

py4j.protocol.Py4JJavaError: An error occurred while calling o25.load.

java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042

This is what I've done till now:

  1. started ./bin/cassandra
  2. created test data using cql with keyspace ="testkeyspace2" and table="emp" and some keys and corresponding values.
  3. Wrote standalone.py
  4. Ran the following pyspark shell command.

    sudo ./bin/spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.0.jar \
    --packages TargetHolding:pyspark-cassandra:0.2.4 \
    examples/src/main/python/standalone.py
    
  5. Got the mentioned error.


standalone.py:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("Stand Alone Python Script")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
loading=sqlContext.read.format("org.apache.spark.sql.cassandra")\
                        .options(table="emp", keyspace = "testkeyspace2")\
                        .load()\
                        .show()

I also tried with --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11 but I'm getting the same error.


Debug:

I checked

netstat -tulpn | grep -i listen | grep <cassandra_pid>

and saw that it is listening on port 9042.


Full log trace:

Traceback (most recent call last):
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/examples/src/main/python/standalone.py", line 8, in <module>
    .options(table="emp", keyspace = "testkeyspace2")\
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 139, in load
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 45, in deco
  File "~/Dropbox/Work/ITNow/spark/spark-1.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.load.
: java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:176)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:203)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:57)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.1.1:9042 (com.datastax.driver.core.TransportException: [/127.0.1.1:9042] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:227)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:82)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1307)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:339)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
    ... 22 more

Am I doing something wrong?

I'm really new to all this so I could use some advice. Thanks!

Upvotes: 2

Views: 5385

Answers (3)

Fthi.a.Abadi
Fthi.a.Abadi

Reputation: 322

Add this next to your --packages dependency, it worked for me perfectly fine. --conf spark.cassandra.connection.host="127.0.0.1"

Upvotes: 0

Arsinux
Arsinux

Reputation: 173

I checked my linux hosts file in /etc/hosts and the the content was like

127.0.0.1       localhost
127.0.1.1       <my hostname>

I changed it to:

127.0.0.1       localhost
127.0.0.1       <my hostname>

and it worked fine.

As you can see in your own log file line number 58 it mentions Your hostname, ganguly resolves to a loopback address: 127.0.1.1; using 192.168.1.32 instead (on interface wlan0) which I guess this apply o your case as well.

Upvotes: 0

Andy Tolbert
Andy Tolbert

Reputation: 11638

Based on our conversations in the question comments, the issue is that 'localhost' was used for rpc_address in your cassandra.yaml file. Cassandra used the OS to resolve 'localhost' to 127.0.0.1 and listened on that interface explicitly.

To fix this you either need to update rpc_address to 127.0.1.1 in cassandra.yaml and restart cassandra or update your SparkConf to reference 127.0.0.1, i.e.:

conf = SparkConf().setAppName("Stand Alone Python Script")
                  .set("spark.cassandra.connection.host", "127.0.0.1")

Although one thing that seems odd to me is that spark.cassandra.connection.host also defaults to 'localhost', so it is weird to me that the spark cassandra connector resolved 'localhost' as '127.0.1.1' yet cassandra resolved it as '127.0.0.1'.

Upvotes: 3

Related Questions