serg kunz
serg kunz

Reputation: 531

How to concurrently process elements in a Collection in Java

I need to process elements in some Collection instance concurrently. In other words instead of iterating a Collection instance

for (Someclass elem : coll){
     process(elem);
}

I’d like to process those elements concurrently. Say, something like ConcurrentCollectionExecutor(coll, new Callable{…}, numberOfThreads). Also, a number of simultaneous threads should be fixed.

Any flexible pattern already exists?

Upvotes: 15

Views: 3263

Answers (4)

Jean Logeart
Jean Logeart

Reputation: 53819

A good solution would be:

  1. instantiate an ArrayBlockingQueue containing the elements to process
  2. instantiate an ExecutorService to execute your processing concurrently
  3. instantiate your Runnables giving them the ArrayBlockingQueue as parameter
  4. Implement the run method: while there elements in the queue, poll them and process them
  5. Submit your Runnables to the ExecutorService

The code:

BlockingQueue<Someclass> toProcess = 
    new ArrayBlockingQueue<Someclass>(coll.size(), false, coll);
ExecutorService es = Executors.newFixedThreadPool(numberOfThreads);
for(int count = 0 ; count < numberOfThreads ; ++c) {
    es.submit(new MyRunnable(toProcess));
}


private static class MyRunnable() implements Runnable {
    private final BlockingQueue<Someclass> toProcess;

    public MyRunnable(BlockingQueue<Someclass> toProcess) {
        this.toProcess = toProcess;
    }

    @Override
    public void run() {
        Someclass element = null;
        while((element = toProcess.poll()) != null) {
            process(element);
        }
    }
}

Upvotes: 8

CloudyMarble
CloudyMarble

Reputation: 37566

I dont know any patterns for this, but as an Idea you can devide your collection elements on the number of threads, so each thread gets X Elements to process, for example:

Collection has 20 elements, youc all your function providing 4 Threads then intern you start them like:

thread1 gets the elements [0 .. 4]
thread2 gets the elements [5 .. 9]
thread3 gets the elements [10 .. 14]
thread1 gets the elements [15 .. 19]

Notice that deleting elements from the collection could cause problems then specially thread 4 tries to access element[19] while there are less than 20 elements in your collection.

EDIT:

As brain mentioned depending on the elements process time, this Idea it can be not effecient as if processing one of the first 5 elements took 10 seconds but the other elements only took 0.5 seconds then thread1 would be busy but the other threads would end up not running in parallel for very long.

Upvotes: 0

Andremoniy
Andremoniy

Reputation: 34900

Below the "hand-made" version of such executor class. Take a notice, you have to pass there not an instance of Callable (or Runnable) but class-name of such processor-class.

public class ConcurrentCollectionExecutor<T> {

private Collection<T> collection;
private Class<Runnable> processor;
private int numberOfThreads;
private Executor executor;

public ConcurrentCollectionExecutor(Collection<T> collection, Class<Runnable> processor, int numberOfThreads) {
    this.collection = collection;
    this.processor = processor;
    this.numberOfThreads = numberOfThreads;
    this.executor = Executors.newFixedThreadPool(numberOfThreads);
}

public void run() {
    try {
        Constructor<Runnable> constructor = null;
        for (T t : collection) {
            if (constructor == null) {
                constructor = processor.getConstructor(t.getClass());
            }
            executor.execute(constructor.newInstance(t));
        }
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}      
}

Upvotes: 2

stepanian
stepanian

Reputation: 11433

Make the process method a run() method in a class called MyRunnable that implements Runnable and whose constructor takes elem as input and stores it as an instance variable. Then use:

ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
for (Someclass elem : coll){
   Runnable worker = new MyRunnable(elem);
   executor.execute(worker);
}

Upvotes: 8

Related Questions