Reputation: 1352
Avoiding duplicates on concurrent reads off the same database table
We have a table that contains a list of tasks
Table RecordsTable
RecordID
RecordName
...
...
IsProcessed
Multiple worker machines read off the table and once the task is processed mark IsProcessed as true.
So if we want the following code to work without duplicates
Pseudocode in C#
//get first 10 records that are not processed based on some other conditions
var recordSet = objectontext.recordstable.Where(...).Where(c => c.IsProcessed == false).Take(10);
//loop through the recordset in a transaction
foreach(record singleRecord in recordSet)
{
bool result = ProcessRecord();
//Mark isProcessed as true
if(result)
singleRecord.IsProcessed = true;
objectContext.Savechanges();
}
We want to avoid duplicate processing of records (since the ProcessRecords() contain mailers and such). If we wrap the entire code above in a transaction does it mean that two calls from two different workers would result in non-duplicate records?
If workerA first issues the call to the table it gets,
var recordSetWorkerA = objectontext.recordstable.Where(somecondition...).Where(c => c.IsProcessed == false).Take(10);
If workerB issues a call after worker A already is in the transaction would the following statment fail to execute because trying to read locked rows or move to the next 10 records?
var recordSetWorkerB = objectontext.recordstable.Where(somecondition...).Where(c => c.IsProcessed == false).Take(10);
Is there any pattern we should be looking at.
Upvotes: 1
Views: 2177
Reputation: 5832
Just wrapping your code into transaction won't be enough. You'll of course get exception on SaveChanges
, but it'll be too late.
What you really need is marking records as being processed, not just completed processing. I see two solutions:
If you workers share same state (meaning they are thread in one AppDomain, and not several concurrent worker services), you can use ConcurrentDictionary
to mark records you're being processed.
foreach(record singleRecord in recordSet)
{
//RecordsInProcess is a globally-available ConcurrentDictionary<recordIdType, record
if (!RecordsInProcess.TryAdd(singleRecord.RecordId, singleRecord))
continue; //TryAdd will return false if such an element already exists
bool result = ProcessRecord();
//Mark isProcessed as true
if(result)
singleRecord.IsProcessed = true;
objectContext.Savechanges();
record junk; // we don't need it
RecordsInProcess.TryRemove(singleRecordId, out junk)
}
processing
before you do anything and then continue with your processing.Upvotes: 1
Reputation: 8885
One option is to explicitly make isProcessed a tri-state enum of { ready, processing, processed }. I don't know how to do this in ActiveRecord, but you want a SQL statement like:
UPDATE RecordsTable
SET ProcessedState = 'processing'
WHERE RecordId = 1
AND ProcessedState = 'ready';
Ensure that exactly one row was updated by this statement. If zero rows were, someone beat you to that task. Make sure that this statement executes in its own transaction with at least "read committed" isolation level.
Upvotes: 1