Eduardo
Eduardo

Reputation: 2341

What is the easiest way to parallelize a task in java?

Say I have a task like:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?

I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.

Upvotes: 67

Views: 115007

Answers (10)

YKun and coding
YKun and coding

Reputation: 323

I know it's an old old thread, but since Rxjava (now it's v3) came out, my favorite way to do parallel programming is through its flatMap by the following several lines. (sometimes but not very intuitive at the first sight)

// Assume we're in main thread at the moment
Flowable.create(...) // upstream data provider, main thread
  .map(...) // some transformers?? main thread
  .filter(...) // some filter thread
  .flatMap(data -> Flowable.just(data)
               .subscribeOn(Schedulers.from(...your executorservice for the sub worker.....), true) // true is to delay the error. 
               .doOnNext(this::process)
           , MAX_CONCURRENT) // max number of concurrent workers
  .subscribe();

You can check it's javadoc to understand the operators. Rxjava 3- Flowable A simple example:

Flowable.range(1, 100)
                .map(Object::toString)
                .flatMap (i -> Flowable.just(i)
                        .doOnNext(j -> {
                            System.out.println("Current thread is ");
                            Thread.sleep(100);
                        }).subscribeOn(Schedulers.io()), true, 10)
        
                .subscribe(
                        integer -> log.info("We consumed {}", integer),
                        throwable -> log.error("We met errors", throwable),
                        () -> log.info("The stream completed!!!"));

And for your case:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

We could try:

Flowable.fromIterable(objects)
        .flatMap(obj -> 
                    Flowable.just(compute(obj)).subscribeOn(Schedulers.io()), true, YOUR_CONCURRENCY_NUMBER))
        .doOnNext(res -> list.add(res))
        .subscribe()

Bonus points: if you need to add some ordering, let's say for example, odd number all go to worker1, even number worker2, etc. Rxjava can achieve that easily by groupBy and flatMap operators together. I won't go too details about them here. Enjoy playing :)))

Upvotes: 0

i000174
i000174

Reputation: 1267

With Java8 and later you can use a parallelStream on the collection to achieve this:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Note: the order of the result list may not match the order in the objects list.

Details how to setup the right number of threads are available in this stackoverflow question how-many-threads-are-spawned-in-parallelstream-in-java-8

Upvotes: 9

walkeros
walkeros

Reputation: 4942

A neat way is to utilize ExecutorCompletionService.

Say you have following code (as in your example):

 public static void main(String[] args) {
    List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      List<Character> result = computeLettersBefore(letter);
      list.add(result);
    }

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Now to execute the tasks in parallel all you need to do is to create ExecutorCompletionService backed by thread pool. Then submit tasks and read the results. Since ExecutorCompletionService uses LinkedBlockingQueue under the hood, the results become available for pickup as soon as they are available (if you run the code you will notice that the order of results is random):

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);

    final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      completionService.submit(() -> computeLettersBefore(letter));
    }

    // NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
    for (char letter : letters) {
      final List<Character> result = completionService.take().get();
      list.add(result);
    }

    threadPool.shutdownNow(); // NOTE: for safety place it inside finally block 

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Upvotes: 1

overthink
overthink

Reputation: 24443

I would recommend taking a look at ExecutorService.

In particular, something like this:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

Here's full code that actually runs:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}

Upvotes: 88

mkamowski
mkamowski

Reputation: 334

I to was going to mention an executor class. Here is some example code that you would place in the executor class.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Then to use it you would make calls to the executor class to populate and execute it.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();

Upvotes: 0

Jonathan Feinberg
Jonathan Feinberg

Reputation: 45324

Here's something I use in my own projects:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Which prints a bit over 2000 on my dual-core box.

Upvotes: 1

Adam Goode
Adam Goode

Reputation: 7468

For a more detailed answer, read Java Concurrency in Practice and use java.util.concurrent.

Upvotes: 1

fastcodejava
fastcodejava

Reputation: 41097

One can simple create a few thread and get the result.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : I think other solutions are cooler.

Upvotes: 6

Michael Barker
Michael Barker

Reputation: 14378

Fork/Join's parallel array is one option

Upvotes: 0

David Rabinowitz
David Rabinowitz

Reputation: 30448

You can use the ThreadPoolExecutor. Here is sample code: http://programmingexamples.wikidot.com/threadpoolexecutor (too long to bring it here)

Upvotes: 0

Related Questions