Reputation: 3
EDIT: Thank you Mark, and for those who have a similar issue, my issue was that I was making a Thread instance of the runnable class first, then submitting the thread to the executorservice.
It helped me figure out that actually, when I use the ExecutorService, if there is an uncaught exception; it will not inform you, it will cancel the process, with no notification. This is why I was getting incomplete processing.
I have an ArrayList of objects, that I want to have processed in batches multi-threaded, but limit the number of threads running at a given time. I found the ExecutorService could handle that. But upon testing if it was processing each record, it seems to only process a very small fraction of the objects I pass to it.
EDIT: I've removed the multi-threading part of it, and processed the objects like normal without using an executor service, on a small batch (only 710), it works fine; is there a chance that the threads are completing too fast and being handled incorrectly? This is meant to normally handle around 300k-800k records at once; which is why I would like to multi-thread it.
public void processContainerRecords(ArrayList<? extends ContainerRecord> records) {
int cores = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(cores);
int batchSize = Settings.LOGIC_BATCH_SIZE;//100
int batches = (int) Math.ceil((double) records.size() / (double) batchSize);
ArrayList<Future<?>> threads = new ArrayList<Future<?>>();
LogicProcessor newHandler = null;
for (int startIndex = 0; startIndex < records.size(); startIndex += batchSize + 1) {
if (records.size() < batchSize) {
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, records.size()));
} else {
int bound = (startIndex + batchSize);
if (bound > records.size()) {
bound = records.size();
}
newHandler = new LogicProcessor(mainGUI, records.subList(startIndex, bound));
}
Thread newThread = new Thread(newHandler);
Future<?> f = executor.submit(newThread);
threads.add(f);
}
executor.shutdown();
int completedThreads = 0;
while (!executor.isTerminated()) {//monitors threads and waits until completion
completedThreads = 0;
for (Future<?> f : threads) {
if (f.isDone()) {
completedThreads++;
}
}
//currentProgress = completedThreads;
}
for (ContainerRecord record : records) {//checks if each record has been processed
System.out.println(record.getContainer() + ":" + record.isTouched());
}
}
This is the LogicProcessor class it starts the thread instances of
private List<? extends ContainerRecord> archive;
private GUI mainGUI;
public LogicProcessor(GUI mainGUI, List<? extends ContainerRecord> records) {
this.mainGUI = mainGUI;
this.archive = records;
}
@Override
public void run() {
handleLogic();
}
private void handleLogic() {
Iterator iterator = archive.iterator();
while (iterator.hasNext()) {
ContainerRecord record = (ContainerRecord) iterator.next();
record.touch();//sets a boolean in the object to validate if it has been processed yet.
}
}
Output: out of 710 records(objects) processed, 691 have never been processed/touched, and only 19 have.
What is going wrong with this? I've tried many things to even making an array of class LogicProcessor and keeping the instances in the array to avoid any sort of GC removing the instance. I'm unsure why it isn't processing these records.
Upvotes: 0
Views: 608
Reputation: 42481
I dont have a computer to run the tests now but from looking on your code so my answer is based on personal experience and can eve t look as a code review because the lack of code clarity is a source of bugs :)
Do not submit new Thread
into the executor service. The whole point of the executors is to hide the word with threads from the user. Instead your LogicProcessor
should implement Runnable/Callable interface depending on whether you want to return the value or not.
Check again the logic of partitioing by batches. If you use guava it already has an implementation of partitioning logic. See this tutorial. I admit its more a personal preference and your code might be just fine as well I havent checked in depth.
Shutdown method and futures handling might be simplified.
Calling shutdown
method causes the executor service to stop accepting new tasks for execution but it wont close the servicec immediately instead it will wait till all the tasks that it already has will be executed. Usually such a thread pool gets created at the beginning of the lifecycle of the application and stands alive as long as the application runs. Creating a pool is pretty expensive since it allocates threads.
If you want to keep the pool opened but make sure that all tasks are done you can use the loop to iterate over futures as you do.
So I dont really see a reason to use both. If you allocate the pool just to submit a bunch of tasks - calling shutdown
is sufficinent. Otherwise you can use loops and treat the pool as global object and call shutdown
elsewhere as I've explained above.
Upvotes: 0