Soid
Soid

Reputation: 2859

Filter from Cassandra table by RDD values

I'd like to query some data from Cassandra based on values I have in an RDD. My approach is the following:

val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e ) 
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name") 
val userNames = userIds.flatMap { userId => 
  t.where("userid = ?", userId).take(1) 
} 
userNames.take(1) 

While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException: 
        org.apache.spark.rdd.RDD.<init>(RDD.scala:125) 
        com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49) 
        com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83) 
        com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94) 

My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD.

The examples I found on the web read the whole Cassandra table in an RDD and join RDDs (like this: https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/). But it won't scale if the Cassandra table is huge.

But how do I approach this problem instead?

Upvotes: 3

Views: 6720

Answers (1)

RussS
RussS

Reputation: 16576

Spark 1.2 or Greater

Spark 1.2 introduces joinWithCassandraTable

val userids = sc.textFile("file:///Users/russellspitzer/users.list")
userids
 .map(Tuple1(_))
 .joinWithCassandraTable("keyspace","table")

This code will end up doing the identical work that the solution below does. The joinWithCassandraTable method will use the same code as the saveToCassandra uses to transform classes into something that Cassandra can understand. This is why we need a tuple rather than just a simple string to perform the join.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable


I think what you actually want to do here is do an inner join on the two datasources. This should actually be faster than a flatmap approach as well as there is some internal smart hashing.

scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list")
scala> userids.take(5)
res19: Array[String] = Array(3, 2)

scala> sc.cassandraTable("test","users").collect
res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3, username: Jacek}, CassandraRow{userid: 1, username: Russ}, CassandraRow{userid: 2, username: Helena})

scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect
res18: Array[(Int, (Boolean, String))] = Array((2,(true,Helena)), (3,(true,Jacek)))

If you actually just want to execute a bunch of primary key queries against your C* database you may be better off just executing them using normal driver pathways and not using spark.

Spark Solution Integrating with Direct Driver Calls

import com.datastax.spark.connector.cql.CassandraConnector
import collection.JavaConversions._

val cc = CassandraConnector(sc.getConf)
val select = s"SELECT * FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
      cc.withSessionDo(session =>
        session.execute(select, id.toInt: java.lang.Integer).iterator.toList.map(row =>
          (row.getInt("userid"), row.getString("username"))))).collect

Upvotes: 7

Related Questions