Reputation: 4489
Assume there is a worker service that receives messages from a queue, reads the product with the specified Id from a document database, applies some manipulation logic based on the message, and finally writes the updated product back to the database (a).
This work can be safely done in parallel when dealing with different products, so we can scale horizontally (b). However, if more than one service instance works on the same product, we might end up with concurrency issues, or concurrency exceptions from the database, in which case we should apply some retry logic (and still the retry might fail again and so on).
Question: How do we avoid this? Is there a way I can ensure two instances are not working on the same product?
Example/Use case: An online store has a great sale on productA, productB and productC that ends in an hour and hundreds of customers are buying. For each purchase, a message is enqueued (productId, numberOfItems, price). Goal: How can we run three instances of our worker service and make sure that all messages for productA will end up in instanceA, productB to instanceB and productC to instanceC (resulting in no concurrency issues)?
Notes: My service is written in C#, hosted on Azure as a Worker Role, I use Azure Queues for messaging, and I'm thinking to use Mongo for storage. Also, the Entity IDs are GUID
.
It's more about the technique/design, so if you use different tools to solve the problem I'm still interested.
Upvotes: 12
Views: 3707
Reputation: 1
You can make use of Kafka here, with productId being the partitioning key. That way the product with same productId will end up in the same partition, and hence will be processed by the same consumer application in the order they were sent. Kafka partition gaurantees that same keys always end up in the same partition.
You can create 3 partitions, given your use-case, and make sure that 3 different consumers are attached to each partition. This way there wont be any synchronization issues.
Upvotes: 0
Reputation: 116
The problem here is that two process trying to access a common resource to perform update simultaneously. If we analyze the current situation, let's say two worker W1 and W2 trying to work on the same product, say product A.
If we allow both worker 1 and worker 2 run in parallel, it can lead to either of two cases (considering race conditions only)
So, now let's think about multiple solutions around it
Distribute task such that specific worker node processes specific product oriented message. It could be achieved simply by having different Azure Queues for different products and having different workers processing messaged from different queue. The downside with this approach is that it doesn't scale well and can lead to hotspot/overhead if a specific product is bought continuously.
Utilizing distributed Locking to take lock on a product before performing any action. The worker process taking the lock, should ensure to unlock it, otherwise it can lead to deadlock situation. It should also consider situation if the system breaks down or gets crashed before unlocking. If a process doesn't get a lock it should wait until a timeout to get the lock to complete the processing. Now, busy wait, i.e., burning CPU cycle during wait is not a good idea.
Upvotes: 0
Reputation: 1
You should use session enabled service bus queue for ordering and concurrency.
Upvotes: 0
Reputation: 33
I assume you have a means to safely access the product queue across all worker services. Given that, one simple way to avoid conflict could be using global queues per product next to the main queue
// Queue[X] is the queue for product X
// QueueMain is the main queue
DoWork(ProductType X)
{
if (Queue[X].empty())
{
product = QueueMain().pop()
if (product.type != X)
{
Queue[product.type].push(product)
return;
}
}else
{
product = Queue[X].pop()
}
//process product...
}
The access to queues need to be atomic
Upvotes: 0
Reputation: 44066
1) Every high scale data solution that I can think of has something built in to handle precisely this sort of conflict. The details will depend on your final choice for data storage. In the case of a traditional relational database, this comes baked in without any add'l work on your part. Refer to your chosen technology's documentation for appropriate detail.
2) Understand your data model and usage patterns. Design your datastore appropriately. Don't design for scale that you won't have. Optimize for your most common usage patterns.
3) Challenge your assumptions. Do you actually have to mutate the same entity very frequently from multiple roles? Sometimes the answer is yes, but often you can simply create a new entity that's similar to reflect the update. IE, take a journaling/logging approach instead of a single-entity approach. Ultimately high volumes of updates on a single entity will never scale.
Upvotes: -1
Reputation: 9189
If you want to always have the database up to date and always consistent with the already processed units then you have several updates on the same mutable entity.
In order to comply with this you need to serialize the updates for the same entity. Either you do this by partitioning your data at producers, either you accumulate the events for the entity on the same queue, either you lock the entity in the worker using an distributed lock or a lock at the database level.
You could use an actor model (in java/scala world using akka) that is creating a message queue for each entity or group of entities that process them serially.
UPDATED You can try an akka port to .net and here. Here you can find a nice tutorial with samples about using akka in scala. But for general principles you should search more about [actor model]. It has drawbacks nevertheless.
In the end pertains to partition your data and ability to create a unique specialized worker(that could be reused and/or restarted in case of failure) for a specific entity.
Upvotes: 1
Reputation: 101150
Any solution attempting to divide the load upon different items in the same collection (like orders) are doomed to fail. The reason is that if you got a high rate of transactions flowing you'll have to start doing one of the following things:
hey guys, are anyone working with this?
)so what's wrong with those approaches?
The first approach is simply replicating transactions in a database. Unless you can spend a large amount of time optimizing the strategy it's better to rely on transactions.
The second two options will decrease performance as you have to dynamically route messages upon ids and also change the strategy at run-time to also include newly inserted messages. It will fail eventually.
Here are two solutions that you can also combine.
Instead you have an entry point somewhere that reads from the message queue.
In it you have something like this:
while (true)
{
var message = queue.Read();
Process(message);
}
What you could do instead to get very simple fault tolerance is to retry upon failure:
while (true)
{
for (i = 0; i < 3; i++)
{
try
{
var message = queue.Read();
Process(message);
break; //exit for loop
}
catch (Exception ex)
{
//log
//no throw = for loop runs the next attempt
}
}
}
You could of course just catch db exceptions (or rather transaction failures) to just replay those messages.
I know, Micro service is a buzz word. But in this case it's a great solution. Instead of having a monolithic core which processes all messages, divide the application in smaller parts. Or in your case just deactivate the processing of certain types of messages.
If you have five nodes running your application you can make sure that Node A receives messages related to orders, node B receives messages related to shipping etc.
By doing so you can still horizontally scale your application, you get no conflicts and it requires little effort (a few more message queues and reconfigure each node).
Upvotes: 2
Reputation: 15850
For this kind of a thing I use blob leases. Basically, I create a blob with the ID of an entity in some known storage account. When worker 1 picks up the entity, it tries to acquire a lease on the blob (and create the blob itself, if it doesn't exist). If it is successful in doing both, then I allow the processing of the message to occur. Always release the lease afterwards. If I am not successfull, I dump the message back onto the queue
I follow the apporach originally described by Steve Marx here http://blog.smarx.com/posts/managing-concurrency-in-windows-azure-with-leases although tweaked to use new Storage Libraries
Edit after comments: If you have a potentially high rate of messages all talking to the same entity (as your commend implies), I would redesign your approach somewhere.. either entity structure, or messaging structure.
For example: consider CQRS design pattern and store changes from processing of every message independently. Whereby, product entity is now an aggregate of all changes done to the entity by various workers, sequentially re-applied and rehydrated into a single object
Upvotes: 1