Reputation: 889
for my current development I have many threads (Producers
) that create Tasks
and many threads that that consume these Tasks
(consumers
)
Each Producers
is identified by a unique name; A Tasks
is made of:
Producers
My question concerns the data structure used by the (Producers
) and the (consumers
).
Naively, we could imagine that Producers
populate a concurrent-queue with Tasks
and (consumers
) reads/consumes the Tasks
stored in the concurrent-queue.
I think that this solution would rather well scale but one single case is problematic: If a Producers
creates very quickly two Tasks
having the same name but not the same data (Both tasks T1 and T2 have the same name but T1 has data D1 and T2 has data D2), it is theoretically possible that they are consumed in the order T2 then T1!
Now, I imagine creating my own data structure (let's say MyQueue
) based on Map + Queue. Such as a queue, it would have a pop()
and a push()
method.
pop()
method would be quite simplepush()
method would:
Task
is not yet inserted in MyQueue
(doing find()
in the Map)
Task
to-be-inserted would be merged with data stored in the found Task
Task
would be inserted in the Map and an entry would be added in the QueueOf course, I'll have to make it safe for concurrent access... and that will certainly be my problem; I am almost sure that this solution won't scale.
So my question is now what are the best data structure I have to use in order to fulfill my requirements
Upvotes: 6
Views: 882
Reputation: 226
If consumers are running in parallel, I doubt there is a way to make them execute tasks with the same name sequentially. In your example (from comments):
BlockingQueue can really be a problem (unfortunately) if a Producer "P1" adds a first task "T" with data D1 and quickly a second task "T" with data D2. In this case, the first task can be handled by a thread and the second task by another thread; If the threads handling the first task is interrupted, the thread handling the second one can complete first
There is no difference if P1 submits D2 not so quickly. Consumer1 could still be too slow, so consumer 2 would be able to finish first. Here is an example for such scenario:
To solve it, you will have to introduce some kind of completion detection, which I believe will overcomplicate things.
If you have enough load and can process some tasks with different names not sequentially, then you can use a queue per consumer and put same named tasks to the same queue.
public class ParallelQueue {
private final BlockingQueue<Task>[] queues;
private final int consumersCount;
public ParallelQueue(int consumersCount) {
this.consumersCount = consumersCount;
queues = new BlockingQueue[consumersCount];
for (int i = 0; i < consumersCount; i++) {
queues[i] = new LinkedBlockingQueue<>();
}
}
public void push(Task<?> task) {
int index = task.name.hashCode() % consumersCount;
queues[index].add(task);
}
public Task<?> pop(int consumerId) throws InterruptedException {
int index = consumerId % consumersCount;
return queues[index].take();
}
private final static class Task<T> {
private final String name;
private final T data;
private Task(String name, T data) {
this.name = name;
this.data = data;
}
}
}
Upvotes: 0
Reputation: 6058
Instead of making a data structure safe for concurrent access, why not opting out concurrent and go for parallel?
Functional programming models such as MapReduce are a very scalable way to solve this kind of problems.
I understand that D1
and D2
can be either analyzed together or in isolation and the only constraint is that they shouldn't be analyzed in the wrong order. (Making some assumption here ) But in case the real problem is only the way the results are combined, there might be an easy solution.
You could remove the constraint all together allowing them to be analyzed separately and then having a reduce function that is able to re-combine them together in a sensible way.
In this case you'd have the first step as map
and the second as reduce
.
Even if the computation is more efficient if done in a single go, a big part of scaling, especially scaling out is accomplished by denormalization.
Upvotes: 0
Reputation: 65811
You could try Heinz Kabutz's Striped Executor Service a possible candidate.
This magical thread pool would ensure that all Runnables with the same stripeClass would be executed in the order they were submitted, but StripedRunners with different stripedClasses could still execute independently.
Upvotes: 1