Reputation: 1145
I have following problem,
I have a queue of tasks and there are a lot of types of tasks like:
A, B, C, D, ...
I execute these tasks in thread pool.
But I have to restrict same type task execution at same time, hence, this is bad:
Thread-1: [A, D, C, B, ...]
Thread-2: [A, C, D, B, ...]
Tasks of type A and B could be executed at same time.
But this is good:
Thread-1: [A,B,A,B,...]
Thread-2: [C,D,D,C,...]
Hence tasks of same type are always executed sequentially.
What is the easiest way to implement this functionality?
Upvotes: 3
Views: 2196
Reputation: 6051
Implement key ordered executor. Each task should have key. Tasks with same keys will be queued and will be executed successively, tasks with different keys will be executed in parallel.
You can try to make it yourself, but it is tricky and error prone. I can see few bugs in answer suggested there.
Upvotes: 0
Reputation: 1372
Interesting problem. Two questions come to mind:
How many different types of tasks are there?
If there are relatively few, the simplest way may be to create one thread for each type and assign each incoming task to its kind of thread. As long as tasks are balanced between types (and that's a big assumption) utilization will be good enough.
What's the expected timeliness/latency for task completion?
If your problem is flexible on the timeliness, you could batch incoming tasks of each kind by count or time interval, submit each batch you retire to the pool, then await completion of batch to submit another of the same kind.
You can adapt the second alternative to batch sizes as small as one, in which case the mechanics of awaiting completion become important for efficiency. CompletableFuture
would fit the bill here; you could chain the "poll next task of type A and submit to pool" action to the task with thenRunAsync
, and fire and forget the task.
You would have to maintain one external task queue per task type; the work queues of the FJ pool would be for in-progress tasks only. Still, this design has a good chance of dealing reasonably with imbalance in task count and workload per type.
Hope this helps.
Upvotes: 0
Reputation: 76
I think you can implement your own DistributedThreadPool to control the thread. It's like some kind of topic subscriber/publisher structure.
I did a example as following:
class DistributeThreadPool {
Map<String, TypeThread> TypeCenter = new HashMap<String, TypeThread>();
public void execute(Worker command) {
TypeCenter.get(command.type).accept(command);
}
class TypeThread implements Runnable{
Thread t = null;
LinkedBlockingDeque<Runnable> lbq = null;
public TypeThread() {
lbq = new LinkedBlockingDeque<Runnable>();
}
public void accept(Runnable inRun) {
lbq.add(inRun);
}
public void start() {
t = new Thread(this);
t.start();
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
lbq.take().run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public DistributeThreadPool(String[] Types) {
for (String t : Types) {
TypeThread thread = new TypeThread();
TypeCenter.put(t, thread);
thread.start();
}
}
public static void main(String [] args) {
DistributeThreadPool dtp = new DistributeThreadPool(new String[] {"AB","CD"});
Worker w1 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB"));
Worker w2 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB"));
Worker w3 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD"));
Worker w4 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD"));
Worker w5 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD"));
List<Worker> workers = new ArrayList<Worker>();
workers.add(w1);
workers.add(w2);
workers.add(w3);
workers.add(w4);
workers.add(w5);
workers.forEach(e->dtp.execute(e));
}
}
Upvotes: 1
Reputation: 13525
This problem easily can be solved with an actor framework like Akka.
For each type of tasks. create an actor.
For each separate task, create a message and send it to the actor of corresponding type. Messages can be of type Runnable
, as they probably are now, and the actor's reaction method can be
@Override
public void onReceive(Object msg) {
((Runnable)msg).run();
}
This way your program will run correctly for any number of threads.
Upvotes: 1
Reputation: 8044
CompletableFuture.supplyAsync(this::doTaskA)
.thenAccept(this::useResultFromTaskAinTaskB);
What's happening above is that Task A and the related Task B are actually run in the same thread (one after the other, no need to "get" a new thread to start running Task B).
Or you can use runAsync
for Task A if you don't need any information from it, but do need to wait for it to complete before running Task B.
By default, CompletableFuture's will use the common thread pool, but if you want more control over which ThreadPool gets used, you can pass a 2nd argument to the async
methods with your own Executor that uses your own ThreadPool.
Upvotes: 0