Neil Sharma
Neil Sharma

Reputation: 23

Spark Cassandra connection through java client

I want to connect to my scylla db/cassandra through spark job & execute lookup query using java client. I tried following

val spark = SparkSession.builder.appName("ScyllaSparkClient")
  .master("local[1]")
  .getOrCreate()


import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show

val vdf = df.mapPartitions(p => {
  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect("MyKeySpace")

    val res = p.map(record => {
      val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='${record.get(1)}' and first='${record.get(0)}'")
      val row = results.one()
      var scyllaRow: Person = null
      if (row != null) {
        scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
      }

      scyllaRow
    })

  session.close()
  cluster.close()
  res
})
vdf.show()

But come across host not available exception (though there are not connection issues, it works fine with java client)

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:210)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:274)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:114)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:94)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 27 more

Any help is appreciated.

Upvotes: 2

Views: 596

Answers (2)

Erick Ramirez
Erick Ramirez

Reputation: 16373

You need to use the Spark Cassandra connector to connect to a Cassandra database from Spark.

The connector is available from here -- https://github.com/datastax/spark-cassandra-connector. But since you're connecting to a Scylla DB, you'll likely need to use Scylla's fork of the connector. Cheers!

Upvotes: 2

Swapnil Chougule
Swapnil Chougule

Reputation: 735

Use 'CassandraConnector' from com.datastax.spark.connector.cql.CassandraConnector It will take care of session management for each partitions.

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder.appName("ScyllaSparkClient")
  .config("spark.cassandra.connection.host", "localhost")
  .master("local[1]")
  .getOrCreate()


import spark.implicits._
val m = Map( "John" -> 2 )
val df = m.toSeq.toDF("first", "id")
df.show

val connector = CassandraConnector(spark.sparkContext.getConf)

val vdf = df.mapPartitions(p => {
  connector.withSessionDo { session =>
    val res = p.map(record => {
      val results = session.execute(s"SELECT * FROM MyKeySpace.MyColumns where id='${record.get(1)}' and first='${record.get(0)}'")
      val row = results.one()
      var scyllaRow: Person = null
      if (row != null) {
        scyllaRow = Person(row.getString("id").toInt, row.getString("first"), row.getString("last"))
      }
      scyllaRow
    })
    res
  }
})
vdf.show()

}

It will work!

Upvotes: 1

Related Questions