Reputation: 1
I have a Java program that given a list, performs some independent processes on each list item (which includes retrieving texts from some HTTP resources and inserts them into an independent HashMap), and finally calculates some numbers on those HashMaps. The main snippet looks like this:
for (int i = 0; i < mylist.size(); i++) {
long startepoch = getTime(mylist.get(i).time);
MyItem m = mylist.get(i);
String index=(i+1)+"";
process1(index, m.name, startepoch, m.duration);
//adds to hashmap1
if(m.name.equals("TEST")) {
process2(index, m.name, startepoch, m.duration);
//adds to hashmap2
} else {
process3(index, m.name, startepoch, m.duration);
//adds to hashmap3
process4(index, m.name, startepoch, m.duration);
//adds to hashmap4
process5(index, m.name, startepoch, m.duration);
//adds to hashmap5
process6(index, m.name, startepoch, m.duration);
//adds to hashmap6
}
}
// then start calculation on all hashmaps
calculate_all();
As currently this snippet is being executed sequentially, this can take 30 minutes or so for a list of 500 items. How can I multi-thread my code to make it faster? And in a thread-safe manner?
I was trying to use ExecutorService executorService = Executors.newFixedThreadPool(10);
and then submit each single process to the executorService
by wrapping it like below, but the problem was I couldn't know when they finsih so to call calculate_all()
. So I didn't continue.
executorService.submit(new Runnable() {
public void run() {
process2(index, m.name, startepoch, m.duration);
}
});
Any better working ideas?
Upvotes: 0
Views: 147
Reputation:
Please note that multi-threading does not necessarily increase speed. Multi-threading is primarily used in reducing the idle CPU cycles by preventing unnecessary sleeps and so on.
There is not much I can help with what you have provided, however, I think you can start by doing something like this:
EDIT
In response to (2):
Your current execution is this:
for (int i = 0; i < mylist.size(); i++) {
some_processes();
}
// then start calculation on all hashmaps
calculate_all();
Now, to remove the "for" loops, your can first start by increasing the "for" loops. e.g:
// Assuming mylist.size() is around 500 and you want, say 5, hardcoded multi-thrads
Thread_1:
for (int i = 0; i < 100; i++) {
some_processes();
}
Thread_2:
for (int i = 100; i < 200; i++) {
some_processes();
}
Thread_3:
for (int i = 200; i < 300; i++) {
some_processes();
}
Thread_4:
for (int i = 300; i < 400; i++) {
some_processes();
}
Thread_5:
for (int i = 400; i < mylist.size(); i++) {
some_processes();
}
// Now you can use these threads as such:
CountDownLatch latch = new CountDownLatch(5);
ExecutorService executor = Executors.newFixedThreadPool(5);
executor.submit(new Thread1(latch));
executor.submit(new Thread2(latch));
executor.submit(new Thread3(latch));
executor.submit(new Thread4(latch));
executor.submit(new Thread5(latch));
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();
There are a couple of disadvantages of this method as you can see. For example, what if the list size becomes, say 380? Then you have an idle thread. Also, what if you want more than 5 threads?
So at this point, you can further increase the amount of "for" loops by making them loop less and less. At maximum, "for loop count" == "thread count", effectively removes your for loop. So technically, you need "mylist.size()" amount of threads. You can do an implementation of this as such:
// Allow a maximum amount of threads, say mylist.size(). I used LinkedBlockingDeque here because you might choose something lower than mylist.size().
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(mylist.size());
CountDownLatch latch = new CountDownLatch(mylist.size());
new Thread(new add_some_processes_w_single_loop_for_loop_to_queue(queue, latch)).start();
new Thread(new take_finished_processes_from_queue(queue)).start();
try {
latch.await(); // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}
// then start calculation on all hashmaps
calculate_all();
Notice that with this arrangement, we have removed your initial "for" loop and instead created another one that just submits new threads as the queue is emptied. You can check BlockingQueue examples with producer and consumer applications. For example see: BlockingQueue examples
EDIT 2
A simple implementation of Future
might be as shown:
ExecutorService executorService = Executors.newCachedThreadPool();
Future future1, future2, future3, future4, future5, future6;
for (int i = 0; i < mylist.size(); i++) {
long startepoch = getTime(mylist.get(i).time);
MyItem m = mylist.get(i);
String index=(i+1)+"";
future1 = executorService.submit(new Callable() {...})
//adds to hashmap1
future1.get(); // Add this if you need to wait for process1 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
if(m.name.equals("TEST")) {
future2 = executorService.submit(new Callable() {...})
//adds to hashmap2
future2.get(); // Add this if you need to wait for process2 to finish before moving on to others. Also, add a try{}catch{} block as shown below.
} else {
future3 = executorService.submit(new Callable() {...})
//adds to hashmap3
future4 = executorService.submit(new Callable() {...})
//adds to hashmap4
future5 = executorService.submit(new Callable() {...})
//adds to hashmap5
future6 = executorService.submit(new Callable() {...})
//adds to hashmap6
// Add extra future.get here as above...
}
}
// then start calculation on all hashmaps
calculate_all();
Don't forget to add a try-catch block, otherwise you may not recover from exceptions and crash.
// Example try-catch block surrounding a Future.get().
try {
Object result = future.get();
} catch (ExecutionException e) {
//Do something
} catch (InterruptedException e) {
//Do something
}
However, you can have a more complex one as shown here. That link also explains Thilo's answer.
Upvotes: 1
Reputation: 262474
but the problem was I couldn't know when they finish
When you submit something to an Executor, you get back a Future
with the result (if any).
You can then call Future::get
from your main thread to wait for these results (or just the completion in your case).
List<Future<?>> completions = executor.invokeAll(tasks);
// later, when you need to wait for completion
for(Future<?> c: completions) c.get();
Another thing you need to take care of is how to store the results. If you plan to have your tasks place them into some shared data structure, make sure to make that thread-safe. It is probably easier to change from Runnable
to Callable
so that the tasks can return a result (which you can later merge in a single-threaded way on the main thread).
Upvotes: 4