Reputation: 3040
I have a job interface IJob like this:
public interface IJob {
public void execute();
}
In my application I have multiple classes implementing this interface, like IJob1 and IJob2:
public class IJob1 implements IJob{
@Override
public void execute(){
System.out.println("IJob1\n");
}
}
public class IJob2 implements IJob{
@Override
public void execute(){
System.out.println("IJob2\n");
}
}
Because the amount of jobs to run increases steadily, I want to create a new job, which takes a list of IJob instances and runs them in parallel. The amount of threads that the new implementation is using to run the jobs in parallel should be configurable. If one of the jobs throws an exception, all other currently running jobs should be stopped as well and the execute() method should pass the exception to the caller.
I wrote this, but I can't be able to run jobs and to check if there is an error:
import java.util.LinkedList;
public class WorkQueue
{
private final int nThreads;
private final IJob[] threads;
private final LinkedList queue;
public WorkQueue(int nThreads)
{
this.nThreads = nThreads;
queue = new LinkedList();
threads = new IJob[nThreads];
for (int i=0; i<nThreads; i++) {
threads[i] = new IJob();
threads[i].execute();
}
}
public void execute(Runnable r) {
synchronized(queue) {
queue.addLast(r);
queue.notify();
}
}
private class PoolWorker extends Thread {
public void run() {
Runnable r;
while (true) {
synchronized(queue) {
while (queue.isEmpty()) {
try
{
queue.wait();
}
catch (InterruptedException ignored)
{
}
}
r = (Runnable) queue.removeFirst();
}
// If we don't catch RuntimeException,
// the pool could leak threads
try {
r.run();
}
catch (RuntimeException e) {
// You might want to log something here
}
}
}
}
}
Could you please give me some few helps to go on? Thank you very much.
Upvotes: 1
Views: 1119
Reputation: 2824
I would recommend a managed thread pool. A typical pattern is to use the Java Executors to obtain a ExecutorService. An ExecutorService is usually implemented with a pool of threads plus a job queue. On ExecutorService, there are methods like shutdownNow()
which "Attempts to stop all actively executing tasks". That sounds like what you want to do.
Example,
List<Callable<Result>> tasks = new ArrayList<>();
for (final Object job: jobs) {
final Callable<Result> task = new Callable<Result>() {
@Override
public Result call() throws Exception {
// Do job here
return result;
}
};
tasks.add(task);
}
final numOfThreads = 20;
final ExecutorService executor = Executors.newFixedThreadPool(numOfThreads);
try {
executor.invokeAll(tasks);
} finally {
executor.shutdownNow();
}
Upvotes: 5
Reputation: 533462
I want to create a new job, which takes a list of IJob instances and runs them in parallel. The amount of threads that the new implementation is using to run the jobs in parallel should be configurable. If one of the jobs throws an exception, all other currently running jobs should be stopped as well and the execute() method should pass the exception to the caller.
This is where using parallelStream() will make you life simpler. You can do
list.parallelStream().
.forEach(IJob::execute);
You might find you don't need a framework at all and you can merge this code into the caller. e.g.
Map<String, T> map = dataSet.parallelStream()
.map(t -> t.transform1())
.filter(t -> t.isGood())
.collect(Collectors.groupingByConcurrent(t.getKey()));
Upvotes: 4