Reputation: 35905
I have a large table that grows vertically. I want to read rows in small batches, so that I can process each and save results.
Table definition
CREATE TABLE foo (
uid timeuuid,
events blob,
PRIMARY KEY ((uid))
)
// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid");
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84
// Step 2. Use last row as a pointer to the start of the next batch
val cc = new CassandraSQLContext(sc)
val cql = s"SELECT events from foo.bar where token(uid) > token($lastUUID) limit $max"
// which is at runtime
// SELECT events from foo.bar WHERE
// token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10
cc.sql(cql).collect()
Last line throws
Exception in thread "main" java.lang.RuntimeException: [1.79] failure: ``)'' expected but identifier ea620 found
SELECT events from foo.bar where token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
But it returns correct 10 records if I run my cql in cqlsh.
// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid");
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84
// Step 2. Execute query
rdd.where(s"token(uid) > token($lastUUID)").take(max)
This throws
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.io.IOException: Exception during preparation of SELECT "uid", "events" FROM "foo"."bar" WHERE token("uid") > ? AND token("uid") <= ? AND uid > $lastUUID ALLOW FILTERING: line 1:118 no viable alternative at character '$'
How to use where token(...)
queries in spark and Cassandra?
Upvotes: 2
Views: 3888
Reputation: 14404
I would use the DataStax Cassandra Java Driver. Similar to your CassandraSQLContext, you would select chunks like this:
val query = QueryBuilder.select("events")
.where(gt(token("uid"),token(lastUUID))
.limit(10)
val rows = session.execute(query).all()
If you want to asynchronously query, session
also has executeAsync
, which returns a RichListenableFuture
which can be wrapped by a scala Future
by adding a callback.
Upvotes: 0