Reputation: 454
I am using Spark Structured streaming along with Cassandra as a sink. Snippet below:
override def start(): StreamingQuery = {
sparkContext.getSparkSession()
.readStream
.option("header", "false")
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("failOnDataLoss","false")
.option("subscribe", topicName)
.load()
.writeStream
.option("checkpointLocation",checkpointLocation)
.foreachBatch(forEachFunction.arceusForEachFunction(_,_))
.start()
And I am using the below to write to Cassandra inside the foreach:
RDD.saveToCassandra(keyspace, tableName)
While doing this I was wondering how to handle issues like Casssandra going down etc. Suppose, out of 3M data to be loaded, 2M was written, before an issue occurred. Now I either have to undo the 2M or process only the 1M. I am not sure what will happen is such scenarios.
Is this somehow taken care of ? Or is there something I have to write to take care of this ?
I also looked at the spark docs, and for "foreach batch "it says "depends on the implementation"
Any help is appreciated. Thanks
Upvotes: 1
Views: 556
Reputation: 87299
First, if you use foreachBatch
you can just write dataframe as-is, without RDD (here is example):
.foreachBatch((df, batchId) =>
df.write.cassandraFormat("sttest", "test")
.mode(SaveMode.Append).save()
)
Regarding recovery - you couldn't undo write to Cassandra - it's not a transactional database, so if some data is written, then they are written. But in most cases, writes should be idempotent (except if you're using operations on lists or LWT), and you can just write data again. Spark Cassandra Connector really attempts to repeat write operation automatically, if it detects that node is down, so you should be covered by this.
P.S. New version of Spark Cassandra Connector (currently in alpha) will support native write of Spark Structured Streaming data to Cassandra.
Upvotes: 1