Reputation: 4064
I would like to validate the design of a multithreaded app I wrote and get clarification/re-assurance on a few points. I apologize in advance for such a long post - I thought to split it into a few questions, but then I would have to reference the same code and they all seemed to be inter-related, so I opted to put everything in one post. If this is not appropriate - please let me know and I will break this into multiple posts.
Here is what I have:
And here is some real and some pseudo code:
public class BatchService {
private ExecutorService servicePool;
private ConcurrentHashMap<String, Future<SingleBatchUploadResult>> uploadBatchFutures = new ConcurrentHashMap<String, Future<SingleBatchUploadResult>>();
// keep last 100 unsuccessful uploads
private ConcurrentLinkedQueue<SingleBatchUploadResult> notCompletedBatches = new ConcurrentLinkedQueue<SingleBatchUploadResult>();
// keep last 100 successful uploads
private ConcurrentLinkedQueue<String> completedBatches = new ConcurrentLinkedQueue<String>();
private Thread monitorThread;
public BatchService() {
executorService = Executors.newFixedThreadPool(MAX_BATCH_UPLOAD_THREADS);
monitorThread = new Thread(new BatchMonitor());
monitorThread.setDaemon(true);
monitorThread.start();
}
@Transactional
public void processUpload(String uploadId, String contentName) {
Future<SingleBatchUploadResult> taskFuture = servicePool.submit(new BatchUploader(uploadId, contentName));
uploadBatchFutures.put(uploadId, taskFuture);
}
@PreDestroy
public void preDestroy() {
// stop the monitor thread
monitorThread.interrupt();
// stop all executors and their threads
cancelAllTasks();
}
public void cancelAllTasks(){
List<Runnable> waitingTasks = servicePool.shutdownNow();
for (Runnable task: waitingTasks){
// examine which tasks are still waiting, if necessary
}
}
public boolean cancelBatchById(String uploadId){
Future<SingleBatchUploadResult> resultFuture = activeBatchFutures.get(uploadId);
if (resultFuture != null && (!resultFuture.isDone() || !resultFuture.isCancelled()) ){
resultFuture.cancel(true);
return true;
}
// this task was either already finished, cancelled, not submitted or unknown
return false;
}
public void getCurrentStatus(){
// just print out the sizes of queues for now
System.out.println("number of active uploads: " + activeBatchFutures.size());
System.out.println("number of successfully completed uploads: " + completedBatches.size());
System.out.println("number of failed uploads: " + notCompletedBatches.size());
}
public class BatchMonitor implements Runnable {
@Override
public void run() {
boolean cont = true;
while (cont) {
if (Thread.currentThread().isInterrupted()){
// the thread is being shut down - get out
cont = false;
break;
}
Iterator<Entry<String, Future<SingleBatchUploadResult>>> iterator = activeBatchFutures.entrySet().iterator();
// remove completed Futures from the map
// add successfully completed batches to completedBatches queue
// add all other batches to notCompletedBatches queue
while (iterator.hasNext() && cont){
…
if (batchUploadFuture.isCancelled()) {
addToNotCompleted(defaultResult);
// remove this future from the active list
activeBatchFutures.remove(uploadId);
} else if (batchUploadFuture.isDone()){
try {
SingleBatchUploadResult result = batchUploadFuture.get();
if (UploadStatus.SUCCESS.equals(result.getUploadStatus()))
addToCompleted(uploadId);
else
addToNotCompleted(result);
} catch (InterruptedException e) {
// the thread is being shut down - stop processing
cont = false;
// preserve interruption state of the thread
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
addToNotCompleted(defaultResult);
}
// remove this future from the active list
activeBatchFutures.remove(uploadId);
} else {
// the task has not finished yet - let it be
// TODO if a Future is not complete - see how old it is [how ?] If older then timeout - cancel it
// For now, rely on the ExecutorService timeout set on the BatchUploader
}
}
// try to sleep for 5 sec, unless the thread is being shutdown
if (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
cont = false;
// preserve interruption state of the thread
Thread.currentThread().interrupt();
}
}
}
System.out.println("BatchMonitor.run() has terminated");
}
public void addToCompleted(String uploadId){
int currentSize = completedBatches.size();
// bring the size of the queue below MAX
if (currentSize > MAX_SUCCESSFUL_RESULTS) {
int delta = MAX_SUCCESSFUL_RESULTS - currentSize;
while (delta > 0){
completedBatches.poll();
delta--;
}
}
completedBatches.offer(uploadId);
}
public void addToNotCompleted(SingleBatchUploadResult result){
int currentSize = notCompletedBatches.size();
// bring the size of the queue below MAX
if (currentSize > MAX_UNSUCCESSFUL_RESULTS) {
int delta = MAX_UNSUCCESSFUL_RESULTS - currentSize;
while (delta > 0){
notCompletedBatches.poll();
delta--;
}
}
notCompletedBatches.offer(result);
}
}
}
public class BatchUploader implements Callable<SingleBatchUploadResult> {
private ExecutorService executorService;
// Map<fileName, Future result> - holds Futures for all files that were submitted for upload (those that did not fail validation)
private ConcurrentHashMap<String, Future<SingleFileUploadResult>> uploadTaskFutures = new ConcurrentHashMap<String, Future<SingleFileUploadResult>>();
private ConcurrentHashMap<String, SingleFileUploadResult> notUploadedFiles = new ConcurrentHashMap<String, SingleFileUploadResult>();
private int totalFilesToUpload = 0;
public BatchUploader(...) {
executorService = Executors.newFixedThreadPool(MAX_THREADS_PER_BATCH);
}
public SingleBatchUploadResult call() {
// do some validation
if ( this is a correct ZIP file){
String errorMessage = processZipArchive(threadName, contentName);
// the errorMessage will be not null if there were some exceptions that happened during the zip archive read:
// opening the ZIP archive, reading entries or thread interruption exceptions
if (errorMessage != null) {
...
return errorBatchUploadResult;
}
}
// all tasks are submitted - stop the service from accepting new requests and shutdown when done
executorService.shutdown();
// now wait until all tasks have finished - but only up to BATCH_UPLOAD_TIMEOUT_IN_SEC seconds
try {
executorService.awaitTermination(BATCH_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// try to shutdown all running tasks and stop waiting tasks from being scheduled;
executorService.shutdownNow();
// preserve interruption state of the thread
Thread.currentThread().interrupt();
return errorBatchUploadResult;
}
// at this point, we either finished all tasks (awaitTermination finished before timeout),
// or we timed out waiting. Get the latest status of each task
List<String> successfullyUploadedFiles = new LinkedList<String>();
for (String entryName : uploadTaskFutures.keySet()) {
Future<SingleFileUploadResult> future = uploadTaskFutures.get(entryName);
try {
if (future.isCancelled()) {
...
notUploadedFiles.putIfAbsent(entryName, taskResult);
} else if (future.isDone()) {
// this task has finished
taskResult = future.get();
if (taskResult.getUploadStatus().equals(UploadStatus.SUCCESS))
successfullyUploadedFiles.add(entryName);
else
notUploadedFiles.putIfAbsent(entryName, taskResult);
} else {
// this task is either not started yet or not finished yet
…
notUploadedFiles.putIfAbsent(entryName, sometaskResult);
}
} catch (InterruptedException e){
// this is a signal to stop processing
batchUploadResult.setTotalFilesToUpload(totalFilesToUpload);
batchUploadResult.setNotUploadedFiles(notUploadedFiles);
batchUploadResult.setSuccessfullyUploadedFiles(successfullyUploadedFiles);
batchUploadResult.setStatusMessage(statusMessage);
batchUploadResult.setUploadStatus(UploadStatus.PARTIAL_FAILURE);
// cancel/stop all executing/waiting SingleFileUpload tasks
executorService.shutdownNow();
// preserve interruption state of the thread
Thread.currentThread().interrupt();
return batchUploadResult;
} catch (ExecutionException e) {
// we do not know what the state of this task is
…
notUploadedFiles.putIfAbsent(entryName, sometaskResult);
}
}
...
return batchUploadResult;
}
private String processZipArchive(String threadName, String zipName) {
// do all ZIP-reading work here
while ( valid file found )
{
if (Thread.currentThread().isInterrupted()){
// this batch uploader thread is being shut down - stop all SingleFileUpload tasks
executorService.shutdownNow();
return errorMessage;
}
// do a try while processing individual files to be able to gather info about failed files but continue processing good ones
try {
// read the file and pass it for processing to SingleFileUploader
Future<SingleFileUploadResult> taskFuture = executorService.submit(new SingleFileUploader(uploadId, bytesContent, zipEntryName));
uploadTaskFutures.put(zipEntryName, taskFuture);
...
} catch (some exceptions) {
notUploadedFiles.put(zipEntryName, taskResult);
}
}
return errorMessage;
}
}
public class SingleFileUploader implements Callable<SingleFileUploadResult> {
...
@Override
public SingleFileUploadResult call() {
// check if there was a cancellation request
if (Thread.currentThread().isInterrupted()){
// this file uploader thread is being shut down - get out
return errorResult;
}
// do the real work here
return result;
}
}
All this works just fine in regular scenarios. However, I would still like to hear your opinion on whether there are better/ more reliable ways to do what I want, especially in the following areas:
I am using a separate thread, BatchMonitor, to keep track of what is active, done and not done yet, by periodically scanning the list of active Futures and moving them into "successfully completed" or "notCompleted[failed]" queues. I wonder if there is a better way to do that?
I use synchronized unbounded queues for that - and bound them to a specified max size myself as I keep adding items to them. I could not find a "bounded concurrent queue" in the standard JDK libs, there are only unbounded ones, and I wish I could use the EvictingQueue from Guava but it is bundled into 15.0 release which does not seem to be out yet… So, I settled on limiting the size of the queues myself, at the expense of using the size() operation , which I know is a problem with concurrent queues as it does a full scan of the queue… My reasoning is that it might be Ok if I keep the size of the queues small - 100 in my case
Do I need Concurrent queues at all ? The only thread that modifies the queues is the BatchMonitor thread, and the only other thread that will be reading the queues is the BatchService thread. The only time I can get into out-of-sync situation is when BatchSErvice tries to get status of a particular upload. It is possible that that upload was already removed from the activeBatchFutures map but not yet placed into either "completed" or "notCompleted" queues, because I do not synchronize reads/writes between the map and the queues deliberately , to avoid unnecessary locking. But I am Ok to leave with occasional "not found" statuses returned for a particular upload - asking for the status second time would get the correct result.
BatchService is a Singleton bean - which brings its own scalability issues, since all requests to this bean will be throttled. Another option could be to make each BatchUploader a Spring bean and limit the number of beans, but then how would I do the overall monitoring?
Handling timeouts and cancellations: I'm trying to make this app bullet-proof where it comes to resources cleanup - I'm trying to handle all ThreadInterpution cases and stop processing to allow threads to be killed. I'm relying on InterruptedException to be caught and handled in the BAtchUploader to propagate this event to the individual FileUploader tasks, by calling the batchPool.shutdownNow(). Can you see any potential cases where I might have runaway Threads - when JVM shuts down, app is re-deployed in a Web container, … ?
Thanks!
Marina
Upvotes: 3
Views: 783
Reputation: 18148
Use Guava's ListenableFuture instead of your BatchMonitor
- the ListenableFuture
can execute a callback as soon as a Future
is complete, which obviates the need for you to use a thread to monitor your Futures
.
Use an ArrayBlockingQueue, which is a bounded concurrent queue. Use take
in the consumer threads to remove an item and block if the queue is empty, and offer(E e, long timeout, TimeUnit unit)
in the producer threads to add an item and block (for timeout units
) if the queue is full.
If you use ListenableFutures
then you shouldn't need a BatchMonitor
or a concurrent queue
I recommend that you check Thread.currentThread().isInterrupted()
on each iteration of your for (String entryName : uploadTaskFutures.keySet())
loop, as you are not calling a method that throws InterruptedException
on all code paths (e.g. if you keep going through the else
path then it might be awhile before you notice that the interrupted flag is set)
Upvotes: 3