Reputation: 26149
I have a queue of tasks that operate on a collection of objects (let's say the objects are entries in an address book, for the sake of example).
An example task might be "Update Joe's phone number to 888-555-1212".
It's possible to have multiple "Update Joe's phone number..." tasks in the queue simultaneously, but with different phone numbers. In this case, the updates must be applied in order to ensure the state is correct at the end (and no, for the sake of argument, it is not possible to put timestamps on the tasks and timestamps on the address book entries and throw away stale tasks).
It is safe to apply an update for Jane out-of-order with an update for Joe.
I would like to multithread processing of the queue, but I need to synchronize access by person.
Is there a handy library for this kind of thing? Or am I relegated to using an Executor and doing my own synchronization on "name" in the Runnable's run() method?
Upvotes: 6
Views: 1276
Reputation: 4223
Such conflicts are always resolved by assigning a version to each object. On every update version is incremented. So if one update comes in the wrong time it can be dismissed or delayed. Any way you should have a way to decide which update is the first and which is second. This method is called optimistic locking.
Upvotes: 0
Reputation: 16932
One Possible Solution
Assume a task is described by some class
class Task {
Integer taskGroup;
// other
}
where taskGroup is an ID which identifies tasks which have to be processes in the order of arrival (in your example, each "Name" could define its own taskGroup - or more generally - tasks w.r.t. the same name belong to the same taskGroup).
Let mainTaskQueue
denote a List
of Task objects. Then
Map<Integer,List<Task>>
, say taskGroupsQueues
taskGroupsQueues.get(taskGroup)
sequentially.task
from your main task lists mainTaskQueue
and appends it to taskGroups.get(task.taskGroup)
In other words: tasks which belong to the same name are executed on the same thread.
Note that if the main thread performs the distribution of the tasks, then he may also perform some kind of load balancing, i.e., if a task is not forced to a specific queue due to order consistency, that task should go to the shortes queue. Howerver, it is inherent in your problem that it may become single threaded - namely when you have only tasks beloging to the same taskGroup (in your case name).
Another Possible Solution (not tested, just a suggestion)
As pointed out in increment1s post and Aurands comment, the synchronization on the taskGroup (name) within the tread has some problems. Basically: it is too late, because the executor may have started two threads who try to sync on the same name. However, you may try to ensure order of execution at the executor level. See for example this post: Java Executors: how can I set task priority? (which references a PriorityBlockingQueue passed to the executor).
Upvotes: -1
Reputation: 7232
A straightforward, but not quite perfect, solution to this problem is to maintain a set of sub queues in an array equal to the number of processing threads you are running. A single master thread pulls items off of your single main queue and adds them to the sub queue indexed via the modulo of the object key's hashCode (the hashCode of whatever identifies and relates your tasks).
E.g.
int queueIndex = myEntity.getKey().hashCode() % queues.length;
Only one thread processes that queue, and all tasks for the same entity will be submitted to that queue, so there will be no race conditions.
This solution is imperfect since some threads may end up with larger queues than others. Practically, this is unlikely to matter but it is something to consider.
Issues with simple solution:
The simpler solution of pulling items off of a single queue and then locking on something distinct for the affected entity has a race condition (as Aurand pointed out). Given:
Master Queue [ Task1(entity1), Task2(entity1), ... ]
Where task1
and task2
both edit the same entity entity1
, and there is thread1
and thread2
operating on the queue, then the expected / desired sequence of events is:
Unfortunately, even if the lock is the first statement of the thread's run method, it is possible for the following sequence to occur:
To avoid this, each thread would would have to lock on something (say the queue) before taking a task from the queue, and then acquire a lock on the entity while still holding the parent lock. However, you do not want to block everything while holding this parent lock and waiting to acquire the entity lock, so you need to only try for the entity lock and then handle the case when it fails to acquire (put it into another queue perhaps). Overall the situation becomes non-trivial.
Upvotes: 3