Reputation: 1173
I have a simple code as below. This checks for alive status for a list of servers. Could you please let me know how this can be done in parallel using threading or any other suitable solutions.
List<Host> hosts = this.getAllHosts();
List<Host> aliveHosts = new ArrayList<>();
if (hosts != null && hosts.size() > 0) {
for (Host host : hosts) {
try {
if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
aliveHosts.add(host);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
return aliveHosts;
How can I call each getByName
in a thread and execute this in parallel at the same time. Currently each of them is having a timeout of 3 seconds. If there are 10 items then the total time would be 30 seconds. Can anyone give a solution so that this can be done in 3-8 seconds overall.
Upvotes: 2
Views: 1588
Reputation: 1
We can do parallelly using Future interface.
package test.basics;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestFutureTask {
private static final int TIMEOUT = 30000;
public static void main(String[] args) {
List<String> hosts = new ArrayList<String>();
hosts.add("127.0.0.1");
hosts.add("127.0.0.2");
hosts.add("127.0.0.3");
hosts.add("127.0.0.4");
hosts.add("127.0.0.5");
hosts.add("127.0.0.6");
List<String> aliveHosts = new ArrayList<>();
List<String> notAliveHosts = new ArrayList<>();
long stTime = System.currentTimeMillis();
System.out.println("Starting time " + stTime);
Map<String, Future> jobList = new HashMap<>();
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
for (String host : hosts) {
Future f = newCachedThreadPool.submit(new Callable<Boolean>() {
private String host;
@Override
public Boolean call() throws Exception {
return InetAddress.getByName(host).isReachable(TIMEOUT);
}
public Callable<Boolean> init(String host) {
this.host = host;
return this;
}
}.init(host));
jobList.put(host, f);
}
for (String host : jobList.keySet()) {
try {
if ((boolean) jobList.get(host).get()) {
aliveHosts.add(host);
} else {
notAliveHosts.add(host);
}
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Ending time : " + endTime);
System.out.println("Time taken :" + (endTime - stTime));
System.out.println("Alive hosts: " + aliveHosts);
System.out.println("Not alive hosts: " + notAliveHosts);
}
}
Sample output:
Starting time 1500570979858
Ending time : 1500571009872
Time taken :30014
Alive hosts: [127.0.0.1]
Not alive hosts: [127.0.0.6, 127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2]
Upvotes: 0
Reputation: 10964
In addition to the accepted Java8 answer you can actually control the level of concurrency quite easily by using a custom ForkJoinPool
:
final Predicate<Host> isAlive = h -> {
try {
return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT);
} catch (Exception e) {
return false;
}
};
final Callable<List<Host>> collectAliveHosts = () ->
hosts.stream().parallel().filter(isAlive).collect(Collectors.toList());
final ForkJoinPool threadPool = new ForkJoinPool(4);
final List<Host> aliveHosts = threadPool.submit(collectAliveHosts).get();
If you don't use a custom pool, the common ForkJoinPool
will be used, which is sized according to the number of cores/CPUs your current machine has. This pool is however used by the whole JVM. That is, if you submit long running tasks to the common pool, the whole application might suffer some performance degradation.
Upvotes: 0
Reputation: 36401
With Java 8 streams:
List<Host> aliveHosts = hosts.stream()
.parallel()
.filter(h -> {
try {
return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT)
} catch(Exception e) {
return false;
}
})
.collect(Collectors.toList());
Upvotes: 6
Reputation: 16039
Java 8 and ExecutorService
:
List<Host> hosts = this.getAllHosts();
List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<Host>());
ExecutorService executorService = Executors.newFixedThreadPool(10);
if (hosts != null && hosts.size() > 0) {
for (Host host : hosts) {
executorService.submit(() -> {
try {
if (InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
aliveHosts.add(host);
}
} catch (IOException e) {
// logger?
}
});
}
}
executorService.shutdown();
return aliveHosts;
Upvotes: 0
Reputation: 964
Non Java 8 way will look similar:
List<Host> hosts = this.getAllHosts();
Queue<Host> q = new ArrayBlockingQueue<>(hosts.size(), true, hosts);
ExecutorService ex = Executors.newFixedThreadPool(5);
List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<>());
while(!q.isEmpty()){
ex.submit(new Runnable() {
@Override
public void run() {
Host host = q.poll();
try {
if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
aliveHosts.add(host);
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
ex.shutdown();
}
Upvotes: 0
Reputation: 76551
Let's consider this threading example:
public class SimpleThreads {
// Display a message, preceded by
// the name of the current thread
static void threadMessage(String message) {
String threadName =
Thread.currentThread().getName();
System.out.format("%s: %s%n",
threadName,
message);
}
private static class MessageLoop
implements Runnable {
public void run() {
String importantInfo[] = {
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too"
};
try {
for (int i = 0;
i < importantInfo.length;
i++) {
// Pause for 4 seconds
Thread.sleep(4000);
// Print a message
threadMessage(importantInfo[i]);
}
} catch (InterruptedException e) {
threadMessage("I wasn't done!");
}
}
}
public static void main(String args[])
throws InterruptedException {
// Delay, in milliseconds before
// we interrupt MessageLoop
// thread (default one hour).
long patience = 1000 * 60 * 60;
// If command line argument
// present, gives patience
// in seconds.
if (args.length > 0) {
try {
patience = Long.parseLong(args[0]) * 1000;
} catch (NumberFormatException e) {
System.err.println("Argument must be an integer.");
System.exit(1);
}
}
threadMessage("Starting MessageLoop thread");
long startTime = System.currentTimeMillis();
Thread t = new Thread(new MessageLoop());
t.start();
threadMessage("Waiting for MessageLoop thread to finish");
// loop until MessageLoop
// thread exits
while (t.isAlive()) {
threadMessage("Still waiting...");
// Wait maximum of 1 second
// for MessageLoop thread
// to finish.
t.join(1000);
if (((System.currentTimeMillis() - startTime) > patience)
&& t.isAlive()) {
threadMessage("Tired of waiting!");
t.interrupt();
// Shouldn't be long now
// -- wait indefinitely
t.join();
}
}
threadMessage("Finally!");
}
}
In essence, you need a Runnable
which is responsible for the way your threads will work. You will need to instantiate a Thread, passing an instance of the Runnable
you have and then start
your Thread
. You will need to have all the Threads accessible and Join them. You can easily manage the timeout limits as well.
Upvotes: 0