Reputation: 454
I have a realtime streaming solution with Kafka, Spark (as the aggregation engine) and Cassandra (as the store). User defines the aggregates that are needed and the engine creates the aggregate and writes them to the store. Here is an example of how the aggregates are created
CREATE AGGR COUNT FROM input_data WHERE type,event,id
This creates a count aggregate for the 3 columns and writes to C*.
We have a requirement to process historical data as well. That means if an aggregate was created today, we need to go back and fix history for it. To cater to this use case, we have created a hvalue column in Cassandra. Here is the schema for reference
CREATE TABLE tbl (
key blob,
key2 blob,
key3 blob,
...
key15 blob,
column1 blob,
column2 blob,
...
column20 blob,
*hvalue* blob,
*value* blob,
PRIMARY KEY ((key, key2, key3 ... key15), column1 ... column20)
) WITH CLUSTERING ORDER BY (column1 ASC,column2 ASC .. column20 ASC)
value stores the facts that are computed while online processing. hvalue stores the value for historical processing. While querying, both the columns are retrieved, merged and returned to user.
We are using datastax leftJoin API to join with Cassandra.
RDD.leftJoinWithCassandraTable(keyspace,tableName)
.on(SomeColumns(...)
.map { case (ip, row) => row match {
case None => ip
case Some(data) => CASSANDRA_MAP_SCHEMA(...)
)
}
}.saveToCassandra(keyspace,tableName)
In short, we create a schema for the RDD, and write the row to Cassandra.
Now, here is the problem. During the historical process, we need to create a row to write to Cassandra. This means that we need to provide some data to the "value" column. If it is a new row that is not present in Cassandra, we create a null object and write back. If the row is present, we take the existing value and write it back. The online and historical process will run at the same time. This means that when the historical process reads a row, and writes back, the online process may have created the same row. This will result in corrupt data, since the historical process may read a stale data and update the value that was written by the online process. I am not sure how to resolve this problem. I'll appreciate if there is any other solutions to prevent this. I tried to explain the best I can, let me know if further clarifications are needed and I'll try to add more inputs.
Thanks in advance for the help.
Upvotes: 0
Views: 391
Reputation: 167
There are a few ways to work around this, but none are really simple. Fundamentally write after write problems are hard.
The first is that you introduce a shared external locking mechanism where you obtain a lock for the row and either release it when it is done or have a short ttl. You can use something like Redis for this.
A second option is to funnel all changes to Cassandra through a kafka queue so that only one source is allowed to write. Though there is a chance that this will make your problem worse. If you are going to do this, make sure that you are partitioning your queue based on keys so that the same key always routes to the same queue.
A third option is that the services are only allowed to operate on data for a given time range. If your online data is only allowed to work on data in the last day, or X hours, etc. and your historical is only allowed to work on data that is more than that period of time old then there is virtually no chance of running into conflicts.
The fourth option is to accept that it is a possibility and that the possibility of it happening is small enough that it isn't an issue. If the datacenter where your code runs is very close (ideally colocated with your db) and you aren't doing significant processing on the row between read and write this may be a reasonable option.
Upvotes: 1