Reputation: 81
Using java 8
I'm trying to write a prog to download log files from diff servers and search a given text in these log files. I'm doing is synchronously right now. I want to do it in parallel and found out that it can be done using Future in java. I'm using apache.commons.io for downloading file from URL. Here is code snippet:
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<XCluster>> clusterFutures = new ArrayList<>();
for(XCluster cluster: clusters) {
clusterFutures.add(executorService.submit(() -> {
return downloadAndSearch(textToSearch, cluster);
}));
}
//For now I'm not doing anything with returned value from Future
But now I want to terminate other download-search operation started under Future as given search is expected to be found in only one of the servers. So there is no need to continue on other Future tasks which I started. can any one suggest a way to do this? I'm using java 8, other options are also welcome. Thanks In Advance!
Upvotes: 1
Views: 186
Reputation: 517
The ExecutorService has a shutdownNow method which will stop all threads and shut down the service.
Edit:
I made some experiments with shutDownNow and as I see it can't stop the threads as I thought. AFAIK It uses interrupts() but not all thread react to interrupt.
So the best alternative I can come up with:
First, create an Indicator class:
public static class Indicator{
private boolean isReady = false;
public void ready(){
isReady = true;
}
public boolean isReady(){
return isReady;
}
}
The threads you start should share one Indicator instance to communicate. So you can create a Callable like this:
public static class Processor implements Callable<Integer> {
private volatile Indicator indicator;
private Integer number;
public Processor(Integer integer, Indicator isReady){
this.number = integer;
this.indicator = isReady;
}
@Override
public Integer call() throws Exception {
System.out.println("Thread started:" + Thread.currentThread().getId());
int counter = 0;
while (!indicator.isReady &&counter < number) {
// Make complicated things
Math.sin(counter);
counter++;
}
if(indicator.isReady){
//another thread finished
//delete resources
System.out.println("Thread interrupted: " + Thread.currentThread().getId() + " " + counter);
return -1;
} else {
System.out.println("Thread finished: " + Thread.currentThread().getId() + " " + counter);
indicator.ready();
return counter;
}
}
}
This way when the first thread is ready it can stop the others and they clean up after himselves.
I tried this as follows:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<Integer>> clusterFutures = new ArrayList<>();
Indicator indicator = new Indicator();
clusterFutures.add(executorService.submit(new Processor(100, indicator)));
clusterFutures.add(executorService.submit(new Processor(10000, indicator)));
clusterFutures.add(executorService.submit(new Processor(10000000,indicator)));
}
A sample output:
Thread started:11
Thread started:12
Thread finished: 11 100
Thread interrupted: 12 1001
Thread started:13
Thread interrupted: 13 0
Sidenote: the referenced classes don't have to be static inner classes just it was easier to make experiments in one file.
Upvotes: 1
Reputation: 7166
In terms of code, the simplest solution is to have a shutdown thread that cancels all the futures:
final ExecutorService executorService = Executors.newCachedThreadPool();
final ExecutorService shutdownService = Executors.newSingleThreadExecutor();
List<Future<XCluster>> clusterFutures = new ArrayList<>();
for(XCluster cluster: clusters) {
clusterFutures.add(executorService.submit(() -> {
boolean cancelOthers = false;
try {
XCluster result = downloadAndSearch(textToSearch, cluster);
cancelOthers = yourPredicateOfSuccess();
return result;
} finally {
if (cancelOthers) {
shutdownService.execute(() -> {
executorService.shutdownNow();
});
}
}
}));
}
The other thread and the try-finally is important because this makes sure that you won't cancel the almost-successful method run.
Upvotes: 0