elelias
elelias

Reputation: 4779

spark connection to cassandra, fail op open connection

This is my build.sbt

name := "cassandra_test"

version := "1.0"

scalaVersion := "2.10.6"

libraryDependencies ++= Seq("com.databricks" %% "spark-avro" % "2.0.1",
"org.apache.spark" %% "spark-sql" % "1.6.0",
"org.apache.spark" %% "spark-core" % "1.6.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0")

and I have installed Cassandra 3.2 (EDIT: now using 2.1.12), the contents of the yaml are, I believe, the correct ones from what I have gathered:

rpc_address: 127.0.0.1 (EDIT: now 0.0.0.0)
rpc_port: 9160
start_rpc: true
broadcast_rpc_address: 127.0.0.1

I am able to start sessions via cqlsh , create tables, query and such.

However, when I try to use the spark connection, I always get:

 java.io.IOException: Failed to open native connection to Cassandra at {127.0.0.1}:9160

these are the contents of my scala code

import com.datastax.spark.connector._
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import com.datastax.spark.connector._

object cassandra_test{

  def main(args: Array[String]){

    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.connection.port", "9160")
    val sc = new SparkContext("local", "Cassandra Connector Test", conf)
    val rdd = sc.cassandraTable("test", "kv")
    println(rdd.count)
   }
}

I've looked around for answers but I think I've covered all the possibilities that I've seen. I have also tried to replace the localhost IP address for the actual IP address. Doing that I can connect via cqlsh but not via spark.

Also tried replacing "127.0.0.1" with "localhost" when setting up conf, but to no avail.

The cassandra connector seems to be the right version too, according to the github page.

No more ideas of what to try unfortunately!

What could be wrong?

=====================================================

EDIT: So I finally solved this. First, I compiled the connector library locally so that I could link it as a jar file and open it with:

spark-shell --jars /mnt/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.6.0-M1-12-gc37bb3c.jar --master local

Note the --master local. That was the important piece that was missing. Then I would run on the shell:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql

sc.stop
import com.datastax.spark.connector._
val conf = new SparkConf(true).set("spark.cassandra.connection.host","localhost")
val rdd = sc.cassandraTable("test", "kv")
println(rdd.count)

Which would always fail without the --master local part on the shell command.

On the scala code, this means that changing the definition of conf to:

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost").setMaster("local")

did the trick too.

I still don't really understand what is going on, if anybody cares to explain, that would be awesome.

Upvotes: 0

Views: 1723

Answers (2)

Daniel Zolnai
Daniel Zolnai

Reputation: 16920

The only difference between your working and non-working code is using localhost instead of 127.0.0.1 for the key spark.cassandra.connection.host, this should be the root cause of your problem.

A somewhat unrelated tip: You can launch the spark-shell with the flag --packages com.datastax.spark:spark-cassandra-connector:1.6.0-M1 to avoid compiling the jar yourself.

Upvotes: 1

Daniel Zolnai
Daniel Zolnai

Reputation: 16920

The Spark Cassandra connector does not support Cassandra 3.2. You should use a lower version, see the version compatibility sheet in the readme of their GitHub repo.

Upvotes: 1

Related Questions