Reputation: 105
I am using a bunch of callables to search a list in individual chunks, once one returns true, I want to cancel all the other running callables. future.cancel is not cancelling them
My NumberFinder
public class NumberFinderImpl implements NumberFinder {
// how many threads the exucutor will use to start the callables
private final int NUMBER_THREADS = 20;
// the amount of elements in the array we will search in each callable chunk
private final int CHUNK_ARRAY_SIZE = 5;
@Override
public boolean contains(int valueToFind, List<CustomNumberEntity> arrayToSearch) {
long startTime = System.nanoTime();
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(NUMBER_THREADS);
CompletionService<Boolean> completionService =
new ExecutorCompletionService<>(WORKER_THREAD_POOL);
int numberOfChunksNeeded = (int) Math.ceil(arrayToSearch.size() / CHUNK_ARRAY_SIZE);
// get a callable for each chunk search
List<Callable<Boolean>> callablesForEachChunkSearch =
getCallablesForEachChunk(
CHUNK_ARRAY_SIZE, numberOfChunksNeeded, valueToFind, arrayToSearch);
// start the callables and collect the futures
List<Future<Boolean>> futuresForCallables =
callablesForEachChunkSearch
.stream()
.map(completionService::submit)
.collect(Collectors.toList());
for (int j = 0; j < futuresForCallables.size(); j++) {
try {
// take().get() is blocking
// so if a callable is not done yet
// it will wait until it is before proceeding
Boolean chunkResult = completionService.take().get();
if (chunkResult) {
long endTime = System.nanoTime();
long timeTaken = endTime - startTime;
// TimeUnit
long timeInSeconds = TimeUnit.SECONDS.convert(timeTaken, TimeUnit.NANOSECONDS);
System.out.println("Search time in seconds" + timeInSeconds);
for (Future<Boolean> future : futuresForCallables) {
// cancel all the other running callables
future.cancel(true);
}
return true;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
for (Future<Boolean> future : futuresForCallables) {
// cancel all the other running callables
future.cancel(true);
}
long endTime = System.nanoTime();
long timeTaken = endTime - startTime;
// TimeUnit
long timeInSeconds = TimeUnit.SECONDS.convert(timeTaken, TimeUnit.NANOSECONDS);
System.out.println("Search time in seconds" + timeInSeconds);
return false;
}
// get a list of callables that each search a certain chunk of the array
private List<Callable<Boolean>> getCallablesForEachChunk(
int chunkArraySize,
int numberOfChunksNeeded,
int valueToFind,
List<CustomNumberEntity> arrayToSearch) {
List<Callable<Boolean>> callableList = new ArrayList<>(numberOfChunksNeeded);
for (int i = 0; i < numberOfChunksNeeded; i++) {
int startPosForCallable = 0;
if (i > 0) {
startPosForCallable = i * chunkArraySize;
}
// dont let end pos go out of bounds
// if the chunk extends past the size, just set endPos as the end of the array
int endPosForCallable =
Math.min(startPosForCallable + chunkArraySize - 1, arrayToSearch.size());
Callable<Boolean> callableSearch =
new NumberFinderCallable(
arrayToSearch, valueToFind, startPosForCallable, endPosForCallable);
callableList.add(callableSearch);
}
return callableList;
}
My callable that does the searching
public class NumberFinderCallable implements Callable<Boolean> {
private List<CustomNumberEntity> arrayToSearch;
private int startPos;
private int endPos;
private int valueToSearchFor;
public NumberFinderCallable(
List<CustomNumberEntity> arrayToSearch, int valueToSearchFor, int startPos, int endPos) {
this.arrayToSearch = arrayToSearch;
this.startPos = startPos;
this.endPos = endPos;
this.valueToSearchFor = valueToSearchFor;
}
@Override
public Boolean call() {
System.out.println(
"Callable started, searching the chunk of array with start pos "
+ startPos
+ " and end pos "
+ endPos);
for (int i = startPos; i <= endPos; i++) {
System.out.println(
"Callable is comparing a number in pos "
+ i
+ " in the chunk with star pos "
+ +startPos
+ " and end pos "
+ endPos);
if (FastestComparator.compare(valueToSearchFor, arrayToSearch.get(i)) == 0) {
System.out.println("element found in pos " + i + ". Returning true");
return true;
}
}
return false;
}
}
I can see from the logs even after a true result is found and all the futures are cancelled that the threads are still going
Upvotes: 0
Views: 247
Reputation: 56
Instead of ExecutorService
ExecutorService WORKER_THREAD_POOL = Executors.newFixedThreadPool(NUMBER_THREADS);
ThreadPoolExecutor WORKER_THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(NUMBER_THREADS);
Then do a purge() to release threads of cancelled futures, e.g.
future.cancel(true);
WORKER_THREAD_POOL.purge();
Upvotes: 0
Reputation: 51433
Based on your previous question you have made it clear that you are trying to parallelize this contains
method because of performance reasons and not for learning about the API. However, IMO you have made the mistake of assuming that this method actually needs to be optimized.
I have made an ad hoc test for a list with 100000000
(100 million) elements, and for the worst-case scenario i.e., trying to find an element that does not exist in the list. For the sequential contain
method
list.contains(Integer.MAX_VALUE)
it took in average approximately:
0.25 seconds
And the parallel version using streams:
list.parallelStream().anyMatch(i -> i.equals(Integer.MAX_VALUE))
it took in average approximately:
0.19 seconds
A speedup of 1.32x
in a 4 core machine. I highly doubt it that one will achieve a lot more than that. Not to mention the maintainability and readability of:
list.parallelStream().anyMatch(i -> i.equals(Integer.MAX_VALUE))
versus a potential parallel solution using explicitly executors and so on.
If the contain
method is that important performance-wise, and one does not have duplicated elements in the list, one should consider using a HashSet
for a constant complexity time contain
method.
Upvotes: 1