Reputation: 2859
I'd like to query some data from Cassandra based on values I have in an RDD. My approach is the following:
val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e )
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name")
val userNames = userIds.flatMap { userId =>
t.where("userid = ?", userId).take(1)
}
userNames.take(1)
While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException:
org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94)
My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD.
The examples I found on the web read the whole Cassandra table in an RDD and join RDDs (like this: https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/). But it won't scale if the Cassandra table is huge.
But how do I approach this problem instead?
Upvotes: 3
Views: 6720
Reputation: 16576
Spark 1.2 introduces joinWithCassandraTable
val userids = sc.textFile("file:///Users/russellspitzer/users.list")
userids
.map(Tuple1(_))
.joinWithCassandraTable("keyspace","table")
This code will end up doing the identical work that the solution below
does. The joinWithCassandraTable
method will use the same code as the
saveToCassandra
uses to transform classes into something that Cassandra can
understand. This is why we need a tuple
rather than just a simple string to perform the join.
I think what you actually want to do here is do an inner join on the two datasources. This should actually be faster than a flatmap approach as well as there is some internal smart hashing.
scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list")
scala> userids.take(5)
res19: Array[String] = Array(3, 2)
scala> sc.cassandraTable("test","users").collect
res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3, username: Jacek}, CassandraRow{userid: 1, username: Russ}, CassandraRow{userid: 2, username: Helena})
scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect
res18: Array[(Int, (Boolean, String))] = Array((2,(true,Helena)), (3,(true,Jacek)))
If you actually just want to execute a bunch of primary key queries against your C* database you may be better off just executing them using normal driver pathways and not using spark.
import com.datastax.spark.connector.cql.CassandraConnector
import collection.JavaConversions._
val cc = CassandraConnector(sc.getConf)
val select = s"SELECT * FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
cc.withSessionDo(session =>
session.execute(select, id.toInt: java.lang.Integer).iterator.toList.map(row =>
(row.getInt("userid"), row.getString("username"))))).collect
Upvotes: 7