Reputation: 595
I am working with Apache Spark and Cassandra, and I want to save my RDD to Cassandra with spark-cassandra-connector.
Here's the code:
def saveToCassandra(step: RDD[(String, String, Date, Int, Int)]) = {
step.saveToCassandra("keyspace", "table")
}
This works fine most of the time, but overrides data that's already present in the db. I would like not to override any data. Is it somehow possible ?
Upvotes: 3
Views: 2008
Reputation: 48
I think it's better to use WithSessionDo outside the foreach partition instead. There's overhead involved in that call that need not be repeated.
Upvotes: 1
Reputation: 6495
What I do is this:
rdd.foreachPartition(x => connector.WithSessionDo(session => {
someUpdater.UpdateEntries(x, session)
// or
x.foreach(y => someUpdater.UpdateEntry(y, session))
}))
The connector
above is CassandraConnector(sparkConf)
.
It's not as nice as a simple saveToCassandra
, but it allows for a fine-grained control.
Upvotes: 4