angel_30
angel_30

Reputation: 1

How to multi-thread my sequential Java code

I have a Java program that given a list, performs some independent processes on each list item (which includes retrieving texts from some HTTP resources and inserts them into an independent HashMap), and finally calculates some numbers on those HashMaps. The main snippet looks like this:

    for (int i = 0; i < mylist.size(); i++) {
        long startepoch = getTime(mylist.get(i).time);
        MyItem m = mylist.get(i);
        String index=(i+1)+"";

        process1(index, m.name, startepoch, m.duration);
        //adds to hashmap1

        if(m.name.equals("TEST")) {
            process2(index, m.name, startepoch, m.duration);
        //adds to hashmap2

        } else {
            process3(index, m.name, startepoch, m.duration);
        //adds to hashmap3
            process4(index, m.name, startepoch, m.duration);
        //adds to hashmap4
            process5(index, m.name, startepoch, m.duration);
        //adds to hashmap5
            process6(index, m.name, startepoch, m.duration);
        //adds to hashmap6
        }
    }

    // then start calculation on all hashmaps
    calculate_all();

As currently this snippet is being executed sequentially, this can take 30 minutes or so for a list of 500 items. How can I multi-thread my code to make it faster? And in a thread-safe manner?

I was trying to use ExecutorService executorService = Executors.newFixedThreadPool(10); and then submit each single process to the executorService by wrapping it like below, but the problem was I couldn't know when they finsih so to call calculate_all(). So I didn't continue.

            executorService.submit(new Runnable() {
                public void run() {
                    process2(index, m.name, startepoch, m.duration);
                }
            });

Any better working ideas?

Upvotes: 0

Views: 147

Answers (2)

user7340499
user7340499

Reputation:

Please note that multi-threading does not necessarily increase speed. Multi-threading is primarily used in reducing the idle CPU cycles by preventing unnecessary sleeps and so on.

There is not much I can help with what you have provided, however, I think you can start by doing something like this:

  1. Use thread safe data structures. This is a must. If you miss this step, your software will break down, eventually. And you will have a very hard time detecting the cause. (e.g. if you have an ArrayList, use thread safe one)
  2. You can start trying out multi-threading by removing the for loop and using a thread for each execution, instead. If your for loop size is more than the amount of your threads, you're going to have to enqueue them.
  3. You have a final calculation that requires all other threads to finish. You can use a CountDownLatch, wait()/notifyAll() or synchronized() depending on your implementation.
  4. Execute your final calculation.

EDIT

In response to (2):

Your current execution is this:

for (int i = 0; i < mylist.size(); i++) {
    some_processes();
}

// then start calculation on all hashmaps
calculate_all();

Now, to remove the "for" loops, your can first start by increasing the "for" loops. e.g:

// Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
Thread_1:
for (int i = 0; i < 100; i++) {
    some_processes();
}
Thread_2:
for (int i = 100; i < 200; i++) {
    some_processes();
}
Thread_3:
for (int i = 200; i < 300; i++) {
    some_processes();
}
Thread_4:
for (int i = 300; i < 400; i++) {
    some_processes();
}
Thread_5:
for (int i = 400; i < mylist.size(); i++) {
    some_processes();
}
// Now you can use these threads as such:
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(new Thread1(latch));
executor.submit(new Thread2(latch));
executor.submit(new Thread3(latch));
executor.submit(new Thread4(latch));
executor.submit(new Thread5(latch));
try {
    latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
    e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();

There are a couple of disadvantages of this method as you can see. For example, what if the list size becomes, say 380? Then you have an idle thread. Also, what if you want more than 5 threads?

So at this point, you can further increase the amount of "for" loops by making them loop less and less. At maximum, "for loop count" == "thread count", effectively removes your for loop. So technically, you need "mylist.size()" amount of threads. You can do an implementation of this as such:

// Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
CountDownLatch latch = new CountDownLatch(mylist.size());

new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
new Thread(new take_finished_processes_from_queue(queue)).start();
try {
    latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
    e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();

Notice that with this arrangement, we have removed your initial "for" loop and instead created another one that just submits new threads as the queue is emptied. You can check BlockingQueue examples with producer and consumer applications. For example see: BlockingQueue examples

EDIT 2

A simple implementation of Future might be as shown:

ExecutorService executorService = Executors.newCachedThreadPool();  
Future future1, future2, future3, future4, future5, future6;  

for (int i = 0; i < mylist.size(); i++) {
    long startepoch = getTime(mylist.get(i).time);
    MyItem m = mylist.get(i);
    String index=(i+1)+"";

    future1 = executorService.submit(new Callable() {...})
    //adds to hashmap1

    future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.

    if(m.name.equals("TEST")) {
        future2 = executorService.submit(new Callable() {...})
    //adds to hashmap2

        future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.

    } else {
        future3 = executorService.submit(new Callable() {...})
    //adds to hashmap3
        future4 = executorService.submit(new Callable() {...})
    //adds to hashmap4
        future5 = executorService.submit(new Callable() {...})
    //adds to hashmap5
        future6 = executorService.submit(new Callable() {...})
    //adds to hashmap6

         // Add extra future.get here as above...
    }
}

// then start calculation on all hashmaps
calculate_all();

Don't forget to add a try-catch block, otherwise you may not recover from exceptions and crash.

// Example try-catch block surrounding a Future.get().
try {
    Object result = future.get();       
} catch (ExecutionException e) {
    //Do something
} catch (InterruptedException e) {
    //Do something
}

However, you can have a more complex one as shown here. That link also explains Thilo's answer.

Upvotes: 1

Thilo
Thilo

Reputation: 262474

but the problem was I couldn't know when they finish

When you submit something to an Executor, you get back a Future with the result (if any).

You can then call Future::get from your main thread to wait for these results (or just the completion in your case).

List<Future<?>> completions = executor.invokeAll(tasks);

// later, when you need to wait for completion
for(Future<?> c: completions) c.get();

Another thing you need to take care of is how to store the results. If you plan to have your tasks place them into some shared data structure, make sure to make that thread-safe. It is probably easier to change from Runnable to Callable so that the tasks can return a result (which you can later merge in a single-threaded way on the main thread).

Upvotes: 4

Related Questions