oleksii
oleksii

Reputation: 35905

Spark: how to read chunk of a table from Cassandra

I have a large table that grows vertically. I want to read rows in small batches, so that I can process each and save results.

Table definition

CREATE TABLE foo ( 
uid timeuuid, 
events blob, 
PRIMARY KEY ((uid)) 
)

Code attempt 1 - using CassandraSQLContext

// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid"); 
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84


// Step 2. Use last row as a pointer to the start of the next batch
val cc = new CassandraSQLContext(sc)
val cql = s"SELECT events from foo.bar where token(uid) > token($lastUUID) limit $max"

// which is at runtime
// SELECT events from foo.bar WHERE 
// token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10

cc.sql(cql).collect()

Last line throws

Exception in thread "main" java.lang.RuntimeException: [1.79] failure: ``)'' expected but identifier ea620 found

SELECT events from foo.bar where token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)

But it returns correct 10 records if I run my cql in cqlsh.

Code attempt 2 - using DataStax Cassandra connector

// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid"); 
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84

// Step 2. Execute query
rdd.where(s"token(uid) > token($lastUUID)").take(max)

This throws

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.io.IOException: Exception during preparation of SELECT "uid", "events" FROM "foo"."bar" WHERE token("uid") > ? AND token("uid") <= ? AND uid > $lastUUID ALLOW FILTERING: line 1:118 no viable alternative at character '$'

How to use where token(...) queries in spark and Cassandra?

Upvotes: 2

Views: 3888

Answers (1)

Arne Claassen
Arne Claassen

Reputation: 14404

I would use the DataStax Cassandra Java Driver. Similar to your CassandraSQLContext, you would select chunks like this:

val query = QueryBuilder.select("events")
  .where(gt(token("uid"),token(lastUUID))
  .limit(10)
val rows = session.execute(query).all()

If you want to asynchronously query, session also has executeAsync, which returns a RichListenableFuture which can be wrapped by a scala Future by adding a callback.

Upvotes: 0

Related Questions