corvax
corvax

Reputation: 1145

How to distribute tasks between threads in ThreadPoolExecutor

I have following problem,

I have a queue of tasks and there are a lot of types of tasks like:

A, B, C, D, ...

I execute these tasks in thread pool.

But I have to restrict same type task execution at same time, hence, this is bad:

Thread-1: [A, D, C, B, ...]
Thread-2: [A, C, D, B, ...]

Tasks of type A and B could be executed at same time.

But this is good:

Thread-1: [A,B,A,B,...]
Thread-2: [C,D,D,C,...]

Hence tasks of same type are always executed sequentially.

What is the easiest way to implement this functionality?

Upvotes: 3

Views: 2196

Answers (5)

Anton
Anton

Reputation: 6051

Implement key ordered executor. Each task should have key. Tasks with same keys will be queued and will be executed successively, tasks with different keys will be executed in parallel.

Implementation in netty

You can try to make it yourself, but it is tricky and error prone. I can see few bugs in answer suggested there.

Upvotes: 0

Cagatay
Cagatay

Reputation: 1372

Interesting problem. Two questions come to mind:

How many different types of tasks are there?

If there are relatively few, the simplest way may be to create one thread for each type and assign each incoming task to its kind of thread. As long as tasks are balanced between types (and that's a big assumption) utilization will be good enough.

What's the expected timeliness/latency for task completion?

If your problem is flexible on the timeliness, you could batch incoming tasks of each kind by count or time interval, submit each batch you retire to the pool, then await completion of batch to submit another of the same kind.

You can adapt the second alternative to batch sizes as small as one, in which case the mechanics of awaiting completion become important for efficiency. CompletableFuture would fit the bill here; you could chain the "poll next task of type A and submit to pool" action to the task with thenRunAsync, and fire and forget the task.

You would have to maintain one external task queue per task type; the work queues of the FJ pool would be for in-progress tasks only. Still, this design has a good chance of dealing reasonably with imbalance in task count and workload per type.

Hope this helps.

Upvotes: 0

Chuck
Chuck

Reputation: 76

I think you can implement your own DistributedThreadPool to control the thread. It's like some kind of topic subscriber/publisher structure.

I did a example as following:

class DistributeThreadPool {

Map<String, TypeThread> TypeCenter = new HashMap<String, TypeThread>();

public void execute(Worker command) {
    TypeCenter.get(command.type).accept(command);
}

class TypeThread implements Runnable{

    Thread t = null;
    LinkedBlockingDeque<Runnable> lbq = null;

    public TypeThread() {
        lbq = new LinkedBlockingDeque<Runnable>();
    }

    public void accept(Runnable inRun) {
        lbq.add(inRun);
    }

    public void start() {
        t = new Thread(this);
        t.start();
    }


    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                lbq.take().run();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public DistributeThreadPool(String[] Types) {
    for (String t : Types) {
        TypeThread thread = new TypeThread();
        TypeCenter.put(t, thread);
        thread.start();
    }
}

public static void main(String [] args) {
        DistributeThreadPool dtp = new DistributeThreadPool(new String[] {"AB","CD"});

        Worker w1 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB"));
        Worker w2 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB"));
        Worker w3 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD"));
        Worker w4 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD"));
        Worker w5 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD"));

        List<Worker> workers = new ArrayList<Worker>();
        workers.add(w1);
        workers.add(w2);
        workers.add(w3);
        workers.add(w4);
        workers.add(w5);

        workers.forEach(e->dtp.execute(e));
    }
}

Upvotes: 1

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13525

This problem easily can be solved with an actor framework like Akka.

For each type of tasks. create an actor.

For each separate task, create a message and send it to the actor of corresponding type. Messages can be of type Runnable, as they probably are now, and the actor's reaction method can be @Override public void onReceive(Object msg) { ((Runnable)msg).run(); }

This way your program will run correctly for any number of threads.

Upvotes: 1

john16384
john16384

Reputation: 8044

CompletableFuture.supplyAsync(this::doTaskA)  
                 .thenAccept(this::useResultFromTaskAinTaskB);

What's happening above is that Task A and the related Task B are actually run in the same thread (one after the other, no need to "get" a new thread to start running Task B).

Or you can use runAsync for Task A if you don't need any information from it, but do need to wait for it to complete before running Task B.

By default, CompletableFuture's will use the common thread pool, but if you want more control over which ThreadPool gets used, you can pass a 2nd argument to the async methods with your own Executor that uses your own ThreadPool.

Upvotes: 0

Related Questions