oleksii
oleksii

Reputation: 35905

How to iterate over large Cassandra table in small chunks in Spark

In my test environment I have 1 Cassandra node and 3 Spark nodes. I want to iterate over apparently large table that has about 200k rows, each roughly taking 20-50KB.

CREATE TABLE foo (
  uid timeuuid,
  events blob,
  PRIMARY KEY ((uid))
) 

Here is scala code that is executed at spark cluster

val rdd = sc.cassandraTable("test", "foo")

// This pulls records in memory, taking ~6.3GB
var count = rdd.select("events").count()  

// Fails nearly immediately with 
// NoHostAvailableException: All host(s) tried for query failed [...]
var events = rdd.select("events").collect()

Cassandra 2.0.9, Spark: 1.2.1, Spark-cassandra-connector-1.2.0-alpha2

I tried to only run collect, without count - in this case it just fails fast with NoHostAvailableException.

Question: what is the correct approach to iterate over large table reading and processing small batch of rows at a time?

Upvotes: 3

Views: 3886

Answers (1)

G Quintana
G Quintana

Reputation: 4667

There are 2 settings in the Cassandra Spark Connector to adjust the chunk size (put them in the SparkConf object):

  • spark.cassandra.input.split.size: number of rows per Spark partition (default 100000)
  • spark.cassandra.input.page.row.size: number of rows per fetched page (ie network roundtrip) (default 1000)

Furthermore, you shouldn't use the collect action in your example because it will fetch all the rows in the driver application memory and may raise an out of memory exception. You can use the collect action only if you know for sure it will produce a small number of rows. The count action is different, it produce only a integer. So I advise you to load your data from Cassandra like you did, process it, and store the result (in Cassandra, HDFS, whatever).

Upvotes: 6

Related Questions