Reputation: 1
I have a problem of how to use spark to manipulate/iterate/scan multiple tables of cassandra. Our project uses spark&spark-cassandra-connector connecting to cassandra to scan multiple tables , try to match related value in different tables and if matched, take the extra action such as table inserting. The use case is like below:
sc.cassandraTable(KEYSPACE, "table1").foreach(
row => {
val company_url = row.getString("company_url")
sc.cassandraTable(keyspace, "table2").foreach(
val url = row.getString("url")
val value = row.getString("value")
if (company_url == url) {
sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value))
}
)
})
The problems are
As spark RDD is not serializable, the nested search will fail cause sc.cassandraTable returns a RDD. The only way I know to work around is to use sc.broadcast(sometable.collect()). But if the sometable is huge, the collect will consume all the memory. And also, if in the use case, several tables use the broadcast, it will drain the memory.
Instead of broadcast, can RDD.persist handle the case? In my case, I use sc.cassandraTable to read all tables in RDD and persist back to disk, then retrieve the data back for processing. If it works, how can I guarantee the rdd read is done by chunks?
Other than spark, is there any other tool (like hadoop etc.??) which can handle the case gracefully?
Upvotes: 0
Views: 655
Reputation: 16576
It looks like you are actually trying to do a series of Inner Joins. See the
joinWithCassandraTable
Method
This allows you to use the elements of One RDD to do a direct query on a Cassandra Table. Depending on the fraction of data you are reading from Cassandra this may be your best bet. If the fraction is too large though you are better off reading the two table separately and then using the RDD.join method to line up rows.
If all else fails you can always manually use the CassandraConnector
Object to directly access the Java Driver and do raw requests with that from a distributed context.
Upvotes: 0