Shamnad P S
Shamnad P S

Reputation: 1173

Thread implementation a for loop iteration with lists

I have a simple code as below. This checks for alive status for a list of servers. Could you please let me know how this can be done in parallel using threading or any other suitable solutions.

        List<Host> hosts = this.getAllHosts();
        List<Host> aliveHosts = new ArrayList<>();
        if (hosts != null && hosts.size() > 0) {
            for (Host host : hosts) {
                try {
                    if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                        aliveHosts.add(host);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return aliveHosts;

How can I call each getByName in a thread and execute this in parallel at the same time. Currently each of them is having a timeout of 3 seconds. If there are 10 items then the total time would be 30 seconds. Can anyone give a solution so that this can be done in 3-8 seconds overall.

Upvotes: 2

Views: 1588

Answers (6)

Santhosh S
Santhosh S

Reputation: 1

We can do parallelly using Future interface.

package test.basics;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class TestFutureTask {
    private static final int TIMEOUT = 30000;

    public static void main(String[] args) {
        List<String> hosts = new ArrayList<String>();
        hosts.add("127.0.0.1");
        hosts.add("127.0.0.2");
        hosts.add("127.0.0.3");
        hosts.add("127.0.0.4");
        hosts.add("127.0.0.5");
        hosts.add("127.0.0.6");
        List<String> aliveHosts = new ArrayList<>();
        List<String> notAliveHosts = new ArrayList<>();

        long stTime = System.currentTimeMillis();
        System.out.println("Starting time " + stTime);
        Map<String, Future> jobList = new HashMap<>();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        for (String host : hosts) {

            Future f = newCachedThreadPool.submit(new Callable<Boolean>() {

                private String host;

                @Override
                public Boolean call() throws Exception {
                    return InetAddress.getByName(host).isReachable(TIMEOUT);
                }

                public Callable<Boolean> init(String host) {
                    this.host = host;
                    return this;
                }
            }.init(host));

            jobList.put(host, f);

        }

        for (String host : jobList.keySet()) {
            try {
                if ((boolean) jobList.get(host).get()) {
                    aliveHosts.add(host);
                } else {
                    notAliveHosts.add(host);
                }
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("Ending time : " + endTime);
        System.out.println("Time taken :" + (endTime - stTime));
        System.out.println("Alive hosts: " + aliveHosts);
        System.out.println("Not alive hosts: " + notAliveHosts);
    }
}

Sample output:

Starting time 1500570979858

Ending time : 1500571009872

Time taken :30014

Alive hosts: [127.0.0.1]

Not alive hosts: [127.0.0.6, 127.0.0.5, 127.0.0.4, 127.0.0.3, 127.0.0.2]

Upvotes: 0

dpr
dpr

Reputation: 10964

In addition to the accepted Java8 answer you can actually control the level of concurrency quite easily by using a custom ForkJoinPool:

final Predicate<Host> isAlive = h -> {
    try {
        return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT);
    } catch (Exception e) {
        return false;
    }
};
final Callable<List<Host>> collectAliveHosts = () ->
    hosts.stream().parallel().filter(isAlive).collect(Collectors.toList());

final ForkJoinPool threadPool = new ForkJoinPool(4);
final List<Host> aliveHosts = threadPool.submit(collectAliveHosts).get();

If you don't use a custom pool, the common ForkJoinPool will be used, which is sized according to the number of cores/CPUs your current machine has. This pool is however used by the whole JVM. That is, if you submit long running tasks to the common pool, the whole application might suffer some performance degradation.

Upvotes: 0

Jean-Baptiste Yun&#232;s
Jean-Baptiste Yun&#232;s

Reputation: 36401

With Java 8 streams:

List<Host> aliveHosts = hosts.stream()
                             .parallel()
                             .filter(h -> {
                                            try {
                                              return InetAddress.getByName(h.getIpaddress()).isReachable(TIMEOUT)
                                            } catch(Exception e) {
                                              return false;
                                            }
                                          })
                             .collect(Collectors.toList());

Upvotes: 6

Adam Siemion
Adam Siemion

Reputation: 16039

Java 8 and ExecutorService:

List<Host> hosts = this.getAllHosts();
List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<Host>());
ExecutorService executorService = Executors.newFixedThreadPool(10);
if (hosts != null && hosts.size() > 0) {
    for (Host host : hosts) {
        executorService.submit(() -> {
            try {
                if (InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                    aliveHosts.add(host);
                }
            } catch (IOException e) {
                // logger?
            }
        });
    }
}
executorService.shutdown();
return aliveHosts;

Upvotes: 0

Aleydin Karaimin
Aleydin Karaimin

Reputation: 964

Non Java 8 way will look similar:

List<Host> hosts = this.getAllHosts();

    Queue<Host> q = new ArrayBlockingQueue<>(hosts.size(), true, hosts);
    ExecutorService ex = Executors.newFixedThreadPool(5);
    List<Host> aliveHosts = Collections.synchronizedList(new ArrayList<>());

    while(!q.isEmpty()){
        ex.submit(new Runnable() {
            @Override
            public void run() {
                Host host = q.poll();
                try {
                    if(InetAddress.getByName(host.getIpaddress()).isReachable(TIMEOUT)) {
                        aliveHosts.add(host);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        });
    }
    ex.shutdown();
}

Upvotes: 0

Lajos Arpad
Lajos Arpad

Reputation: 76551

Let's consider this threading example:

public class SimpleThreads {

    // Display a message, preceded by
    // the name of the current thread
    static void threadMessage(String message) {
        String threadName =
            Thread.currentThread().getName();
        System.out.format("%s: %s%n",
                          threadName,
                          message);
    }

    private static class MessageLoop
        implements Runnable {
        public void run() {
            String importantInfo[] = {
                "Mares eat oats",
                "Does eat oats",
                "Little lambs eat ivy",
                "A kid will eat ivy too"
            };
            try {
                for (int i = 0;
                     i < importantInfo.length;
                     i++) {
                    // Pause for 4 seconds
                    Thread.sleep(4000);
                    // Print a message
                    threadMessage(importantInfo[i]);
                }
            } catch (InterruptedException e) {
                threadMessage("I wasn't done!");
            }
        }
    }

    public static void main(String args[])
        throws InterruptedException {

        // Delay, in milliseconds before
        // we interrupt MessageLoop
        // thread (default one hour).
        long patience = 1000 * 60 * 60;

        // If command line argument
        // present, gives patience
        // in seconds.
        if (args.length > 0) {
            try {
                patience = Long.parseLong(args[0]) * 1000;
            } catch (NumberFormatException e) {
                System.err.println("Argument must be an integer.");
                System.exit(1);
            }
        }

        threadMessage("Starting MessageLoop thread");
        long startTime = System.currentTimeMillis();
        Thread t = new Thread(new MessageLoop());
        t.start();

        threadMessage("Waiting for MessageLoop thread to finish");
        // loop until MessageLoop
        // thread exits
        while (t.isAlive()) {
            threadMessage("Still waiting...");
            // Wait maximum of 1 second
            // for MessageLoop thread
            // to finish.
            t.join(1000);
            if (((System.currentTimeMillis() - startTime) > patience)
                  && t.isAlive()) {
                threadMessage("Tired of waiting!");
                t.interrupt();
                // Shouldn't be long now
                // -- wait indefinitely
                t.join();
            }
        }
        threadMessage("Finally!");
    }
}

Source.

In essence, you need a Runnable which is responsible for the way your threads will work. You will need to instantiate a Thread, passing an instance of the Runnable you have and then start your Thread. You will need to have all the Threads accessible and Join them. You can easily manage the timeout limits as well.

Upvotes: 0

Related Questions