Michael
Michael

Reputation: 42100

Concurrent request processing in Java with constraints

Suppose I need to process requests of 3 types: A, B, and C as follows:

More generally, the number of types is N and the number of concurrent requests is K <= N.

How would you implement it in Java with java.util.concurrent?

Upvotes: 7

Views: 1495

Answers (4)

deprecated
deprecated

Reputation: 5252

The domain of your problem can be modelled into two data structures, which I have called pending (which maps types to unbounded queues of tasks - this is where tasks wait to be run), and running (where at most one task per type ready to be run, or actually being run by an Executor).

The K constraint must be applied to running: it has at most K Type to Task mappings.

The highlight is that the number of threads you allocate for all the task processing is entirely orthogonal to the concurrency constraint handling: your thread pool choice should be dictated (among other things) by the type of tasks to be performed (IO/CPU bound?), not by the concurrency constraints.

An implementation:

public class Foo {

    enum TaskType { A, B, C }

    class Task {
        TaskType type;
        Runnable runnable;
        volatile boolean running;
    }

    Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>();

    Map<TaskType, Task> running = new HashMap<TaskType, Task>();

    ExecutorService executor = null; // Executor implementation is irrelevant to the problem

    /** Chooses a task of a random type between those not running. */
    TaskType choosePending(){
        Set running_types = running.keySet();
        running_types.removeAll(Arrays.asList(pending.keySet()));
        List shuffled = new ArrayList(running_types);
        Collections.shuffle(shuffled);
        return (TaskType) shuffled.get(0);
    }

    // note that max concurrency != parallelism level (which the executor is responsible for)
    final int MAX_CONCURRENCY = 3;

    void produce(){
        synchronized(running){
            if (running.size() < MAX_CONCURRENCY) {
                synchronized (pending){
                    TaskType t = choosePending();
                    running.put(t, pending.get(t).remove()) ;
                }
            }
        }
    }

    {
        new Thread(new Runnable() {
            public void run() {
                while (true) produce();
            }
        }).start();
    }

    Task chooseRunning(){
         for (Task t : running.values()){
             if (!t.running){
                 return t;
             }
         }
        return null;
    }

    void consume(){
        final Task t;
        synchronized (running){
            t = chooseRunning();
            if (t != null){
                t.running = true;
                executor.execute(new Runnable() {
                    public void run() {
                        t.runnable.run();
                        synchronized (running) {
                            running.remove(t);
                        }
                    }
                });
            }
        }
    }

    {
        new Thread(new Runnable() {
            public void run() {
                while (true) consume();
            }
        }).start();
    }

}

Upvotes: 0

Mikhail
Mikhail

Reputation: 4223

You can not process K requests at the same time, that will break second rule. Maximum number of concurrent requests is number types. In your case its three. So make three queues and attache them to three threads. Thats the only way. Executors.newSingleThreadExecutor implements this technique.

public static void main(String[] args) {
    int N = 2;
    int K = 3;
    List<Executor> executors = new ArrayList<Executor>(N);
    for(int i = 0; i < N; i++){
        executors.add(Executors.newSingleThreadExecutor());
    }
    Map<Type, Executor> typeExecutors = new HashMap<Type, Executor>(K);
    int i = 0;
    for(Type t : Type.values()){
        typeExecutors.put(t, executors.get(i++ % executors.size()));
    }
}

enum Type{
    T1, T2, T3
}

Upvotes: 2

Gyanendra Dwivedi
Gyanendra Dwivedi

Reputation: 5557

Requests are processed concurrently.

You may use Executor service.

There are at most K requests to be processed concurrently at the same time.

In executor, set the maximum number of threads.

Requests of the same type cannot be processed at the same time.

You may think to have different locks for each request type. Just make sure that if a thread is not able to get the lock for a request for a designated time, it should yield and go for next task processing.

Upvotes: 0

RamonBoza
RamonBoza

Reputation: 9038

I would create three Executors.newFixedThreadPool(1)

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)

and with a Factory delegate each execution for any of the executors.

ExecutorService executor = ThreadFactory.getExecutorForRequest(RequestType type);
executor.execute(request);

Upvotes: 0

Related Questions