user8053367
user8053367

Reputation: 1

use spark to scan multiple cassandra tables using spark-cassandra-connector

I have a problem of how to use spark to manipulate/iterate/scan multiple tables of cassandra. Our project uses spark&spark-cassandra-connector connecting to cassandra to scan multiple tables , try to match related value in different tables and if matched, take the extra action such as table inserting. The use case is like below:

sc.cassandraTable(KEYSPACE, "table1").foreach(
  row => {
     val company_url = row.getString("company_url")

     sc.cassandraTable(keyspace, "table2").foreach(
         val url = row.getString("url")
         val value = row.getString("value")
         if (company_url == url) {
            sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value))
         }
     )
})

The problems are

  1. As spark RDD is not serializable, the nested search will fail cause sc.cassandraTable returns a RDD. The only way I know to work around is to use sc.broadcast(sometable.collect()). But if the sometable is huge, the collect will consume all the memory. And also, if in the use case, several tables use the broadcast, it will drain the memory.

  2. Instead of broadcast, can RDD.persist handle the case? In my case, I use sc.cassandraTable to read all tables in RDD and persist back to disk, then retrieve the data back for processing. If it works, how can I guarantee the rdd read is done by chunks?

  3. Other than spark, is there any other tool (like hadoop etc.??) which can handle the case gracefully?

Upvotes: 0

Views: 655

Answers (1)

RussS
RussS

Reputation: 16576

It looks like you are actually trying to do a series of Inner Joins. See the

joinWithCassandraTable Method

This allows you to use the elements of One RDD to do a direct query on a Cassandra Table. Depending on the fraction of data you are reading from Cassandra this may be your best bet. If the fraction is too large though you are better off reading the two table separately and then using the RDD.join method to line up rows.

If all else fails you can always manually use the CassandraConnector Object to directly access the Java Driver and do raw requests with that from a distributed context.

Upvotes: 0

Related Questions