Marina
Marina

Reputation: 4064

ExecutorService-related design questions

I would like to validate the design of a multithreaded app I wrote and get clarification/re-assurance on a few points. I apologize in advance for such a long post - I thought to split it into a few questions, but then I would have to reference the same code and they all seemed to be inter-related, so I opted to put everything in one post. If this is not appropriate - please let me know and I will break this into multiple posts.

Here is what I have:

  1. BatchService (a Spring Singleton bean): accepts requests to upload a specified directory or a zip archive. For that, it holds ExecutorService servicePool . On each request, it submits a new BatchUploader Callable task to the pool and stores returned Future in a list - a TX method. It provides methods to get status of all uploads and to cancel all uploads. It also starts a new BatchMonitor thread to monitor the progress of uploads and update queues that hold completed and not completed upload infos. It also cleans up all resources when the bean is about to be destroyed (using Spring's PreDestroy callback)
  2. BatchUploader is a Callable task and it also has its own ServiceExecutor batchPool to upload individual files. In its call() method it scans the directory or zip archive and for each file it submits a SingleFileUploader Callable task to its pool.
  3. SingleFileUploader is a Callable task and in its call() method it does all the work uploading and processing the file and returns some status.

And here is some real and some pseudo code:

public class BatchService {

private ExecutorService servicePool;
private ConcurrentHashMap<String, Future<SingleBatchUploadResult>> uploadBatchFutures = new ConcurrentHashMap<String, Future<SingleBatchUploadResult>>();
// keep last 100 unsuccessful uploads
private ConcurrentLinkedQueue<SingleBatchUploadResult> notCompletedBatches = new ConcurrentLinkedQueue<SingleBatchUploadResult>();
// keep last 100 successful uploads
private ConcurrentLinkedQueue<String> completedBatches = new ConcurrentLinkedQueue<String>();
private Thread monitorThread;

public BatchService() {
    executorService = Executors.newFixedThreadPool(MAX_BATCH_UPLOAD_THREADS);
    monitorThread = new Thread(new BatchMonitor());
    monitorThread.setDaemon(true);
    monitorThread.start();
}

@Transactional
public void processUpload(String uploadId, String contentName) {
    Future<SingleBatchUploadResult> taskFuture = servicePool.submit(new BatchUploader(uploadId, contentName));
    uploadBatchFutures.put(uploadId, taskFuture);
}

@PreDestroy
public void preDestroy() {
    // stop the monitor thread
    monitorThread.interrupt();
    // stop all executors and their threads
    cancelAllTasks();
}

public void cancelAllTasks(){
    List<Runnable> waitingTasks =  servicePool.shutdownNow();
    for (Runnable task: waitingTasks){
        // examine which tasks are still waiting, if necessary            
    }
}

public boolean cancelBatchById(String uploadId){
    Future<SingleBatchUploadResult> resultFuture = activeBatchFutures.get(uploadId);
    if (resultFuture != null && (!resultFuture.isDone() || !resultFuture.isCancelled()) ){
        resultFuture.cancel(true);
        return true;
    } 
    // this task was either already finished, cancelled, not submitted or unknown
    return false;
}

public void getCurrentStatus(){
    // just print out the sizes of queues for now
    System.out.println("number of active uploads: " + activeBatchFutures.size());            
    System.out.println("number of successfully completed uploads: " + completedBatches.size());            
    System.out.println("number of failed uploads: " + notCompletedBatches.size());                   
}


public class BatchMonitor implements Runnable {
    @Override
    public void run() {
        boolean cont = true;
        while (cont) {
            if (Thread.currentThread().isInterrupted()){
                // the thread is being shut down - get out
                cont = false;
                break;
            }                 
            Iterator<Entry<String, Future<SingleBatchUploadResult>>> iterator = activeBatchFutures.entrySet().iterator();
            // remove completed Futures from the map
            // add successfully completed batches to completedBatches queue
            // add all other batches to notCompletedBatches queue
            while (iterator.hasNext() && cont){
               …
                if (batchUploadFuture.isCancelled()) {                        
                    addToNotCompleted(defaultResult);
                    // remove this future from the active list
                    activeBatchFutures.remove(uploadId);                        
                } else if (batchUploadFuture.isDone()){
                    try {
                        SingleBatchUploadResult result = batchUploadFuture.get();
                        if (UploadStatus.SUCCESS.equals(result.getUploadStatus()))
                            addToCompleted(uploadId);
                        else 
                            addToNotCompleted(result);
                    } catch (InterruptedException e) {
                        // the thread is being shut down - stop processing
                        cont = false;
                        // preserve interruption state of the thread
                        Thread.currentThread().interrupt();
                        break;
                    } catch (ExecutionException e) {
                        addToNotCompleted(defaultResult);
                    }
                    // remove this future from the active list
                    activeBatchFutures.remove(uploadId);
                } else {
                    // the task has not finished yet - let it be
                    // TODO if a Future is not complete - see how old it is [how ?] If older then timeout - cancel it
                    // For now, rely on the ExecutorService timeout set on the BatchUploader 
                }

            }
            // try to sleep for 5 sec, unless the thread is being shutdown
            if (!Thread.currentThread().isInterrupted()){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    cont = false;
                    // preserve interruption state of the thread
                    Thread.currentThread().interrupt();
                }
            }

        }
        System.out.println("BatchMonitor.run() has terminated");
    }

    public void addToCompleted(String uploadId){
        int currentSize = completedBatches.size();
        // bring the size of the queue below MAX
        if (currentSize > MAX_SUCCESSFUL_RESULTS) {
            int delta = MAX_SUCCESSFUL_RESULTS - currentSize;
            while (delta > 0){
                completedBatches.poll();
                delta--;
            }
        }
        completedBatches.offer(uploadId);            
    }

    public void addToNotCompleted(SingleBatchUploadResult result){
        int currentSize = notCompletedBatches.size();
        // bring the size of the queue below MAX
        if (currentSize > MAX_UNSUCCESSFUL_RESULTS) {
            int delta = MAX_UNSUCCESSFUL_RESULTS - currentSize;
            while (delta > 0){
                notCompletedBatches.poll();
                delta--;
            }
        }
        notCompletedBatches.offer(result);            
    }

}
}

public class BatchUploader implements Callable<SingleBatchUploadResult> {

private ExecutorService executorService;
// Map<fileName, Future result> - holds Futures for all files that were submitted for upload (those that did not fail validation)
private ConcurrentHashMap<String, Future<SingleFileUploadResult>> uploadTaskFutures = new ConcurrentHashMap<String, Future<SingleFileUploadResult>>();
private ConcurrentHashMap<String, SingleFileUploadResult> notUploadedFiles = new ConcurrentHashMap<String, SingleFileUploadResult>();
private int totalFilesToUpload = 0;

public BatchUploader(...) {
    executorService = Executors.newFixedThreadPool(MAX_THREADS_PER_BATCH);
}

public SingleBatchUploadResult call() {
// do some validation
     if ( this is a correct ZIP file){
        String errorMessage = processZipArchive(threadName, contentName);
        // the errorMessage will be not null if there were some exceptions that happened during the zip archive read:
        // opening the ZIP archive, reading entries or thread interruption exceptions
        if (errorMessage != null) {
    ...
            return errorBatchUploadResult;                
        }
     }        
    // all tasks are submitted - stop the service from accepting new requests and shutdown when done
    executorService.shutdown();

    // now wait until all tasks have finished - but only up to BATCH_UPLOAD_TIMEOUT_IN_SEC seconds
    try {
        executorService.awaitTermination(BATCH_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        // try to shutdown all running tasks and stop waiting tasks from being scheduled;
        executorService.shutdownNow();
        // preserve interruption state of the thread
        Thread.currentThread().interrupt();
        return errorBatchUploadResult; 
    }

    // at this point, we either finished all tasks (awaitTermination finished before timeout),
    // or we timed out waiting. Get the latest status of each task
    List<String> successfullyUploadedFiles = new LinkedList<String>();
    for (String entryName : uploadTaskFutures.keySet()) {
        Future<SingleFileUploadResult> future = uploadTaskFutures.get(entryName);
        try {
            if (future.isCancelled()) {
                ...
                notUploadedFiles.putIfAbsent(entryName, taskResult);                   
            } else if (future.isDone()) {
                // this task has finished
                taskResult = future.get();
                if (taskResult.getUploadStatus().equals(UploadStatus.SUCCESS))
                    successfullyUploadedFiles.add(entryName);
                else
                    notUploadedFiles.putIfAbsent(entryName, taskResult);                   
            } else {
                // this task is either not started yet or not finished yet 
                …
                notUploadedFiles.putIfAbsent(entryName, sometaskResult);
            }
        } catch (InterruptedException e){
            // this is  a signal to stop processing
            batchUploadResult.setTotalFilesToUpload(totalFilesToUpload);
            batchUploadResult.setNotUploadedFiles(notUploadedFiles);
            batchUploadResult.setSuccessfullyUploadedFiles(successfullyUploadedFiles);
            batchUploadResult.setStatusMessage(statusMessage);
            batchUploadResult.setUploadStatus(UploadStatus.PARTIAL_FAILURE);
            // cancel/stop all executing/waiting SingleFileUpload tasks
            executorService.shutdownNow();
            // preserve interruption state of the thread
            Thread.currentThread().interrupt();
            return batchUploadResult;
        } catch (ExecutionException e) {
            // we do not know what the state of this task is 
            …
            notUploadedFiles.putIfAbsent(entryName, sometaskResult);
        }            
    }
    ...
    return batchUploadResult;
}

private String processZipArchive(String threadName, String zipName) {
   // do all ZIP-reading work here
        while ( valid file found )
        {
            if (Thread.currentThread().isInterrupted()){
                // this batch uploader thread is being shut down -  stop all SingleFileUpload tasks
                executorService.shutdownNow();
                return errorMessage;
            } 
            // do a try while processing individual files to be able to gather info about failed files but continue processing good ones
            try {
                // read the file and pass it for processing to SingleFileUploader
                Future<SingleFileUploadResult> taskFuture = executorService.submit(new SingleFileUploader(uploadId, bytesContent, zipEntryName));
                uploadTaskFutures.put(zipEntryName, taskFuture);
                ...
             } catch (some exceptions) {
                  notUploadedFiles.put(zipEntryName, taskResult);
            }
        }
return errorMessage;
}    
}

public class SingleFileUploader implements Callable<SingleFileUploadResult> {
...    
@Override
public SingleFileUploadResult call() {
    // check if there was a cancellation request
    if (Thread.currentThread().isInterrupted()){
        // this file uploader thread is being shut down - get out            
        return errorResult;
    } 
    // do the real work here
    return result;
}

}

All this works just fine in regular scenarios. However, I would still like to hear your opinion on whether there are better/ more reliable ways to do what I want, especially in the following areas:

  1. I am using a separate thread, BatchMonitor, to keep track of what is active, done and not done yet, by periodically scanning the list of active Futures and moving them into "successfully completed" or "notCompleted[failed]" queues. I wonder if there is a better way to do that?

  2. I use synchronized unbounded queues for that - and bound them to a specified max size myself as I keep adding items to them. I could not find a "bounded concurrent queue" in the standard JDK libs, there are only unbounded ones, and I wish I could use the EvictingQueue from Guava but it is bundled into 15.0 release which does not seem to be out yet… So, I settled on limiting the size of the queues myself, at the expense of using the size() operation , which I know is a problem with concurrent queues as it does a full scan of the queue… My reasoning is that it might be Ok if I keep the size of the queues small - 100 in my case

  3. Do I need Concurrent queues at all ? The only thread that modifies the queues is the BatchMonitor thread, and the only other thread that will be reading the queues is the BatchService thread. The only time I can get into out-of-sync situation is when BatchSErvice tries to get status of a particular upload. It is possible that that upload was already removed from the activeBatchFutures map but not yet placed into either "completed" or "notCompleted" queues, because I do not synchronize reads/writes between the map and the queues deliberately , to avoid unnecessary locking. But I am Ok to leave with occasional "not found" statuses returned for a particular upload - asking for the status second time would get the correct result.

  4. BatchService is a Singleton bean - which brings its own scalability issues, since all requests to this bean will be throttled. Another option could be to make each BatchUploader a Spring bean and limit the number of beans, but then how would I do the overall monitoring?

  5. Handling timeouts and cancellations: I'm trying to make this app bullet-proof where it comes to resources cleanup - I'm trying to handle all ThreadInterpution cases and stop processing to allow threads to be killed. I'm relying on InterruptedException to be caught and handled in the BAtchUploader to propagate this event to the individual FileUploader tasks, by calling the batchPool.shutdownNow(). Can you see any potential cases where I might have runaway Threads - when JVM shuts down, app is re-deployed in a Web container, … ?

Thanks!

Marina

Upvotes: 3

Views: 783

Answers (1)

Zim-Zam O&#39;Pootertoot
Zim-Zam O&#39;Pootertoot

Reputation: 18148

  1. Use Guava's ListenableFuture instead of your BatchMonitor - the ListenableFuture can execute a callback as soon as a Future is complete, which obviates the need for you to use a thread to monitor your Futures.

  2. Use an ArrayBlockingQueue, which is a bounded concurrent queue. Use take in the consumer threads to remove an item and block if the queue is empty, and offer(E e, long timeout, TimeUnit unit) in the producer threads to add an item and block (for timeout units) if the queue is full.

  3. If you use ListenableFutures then you shouldn't need a BatchMonitor or a concurrent queue

  4. I recommend that you check Thread.currentThread().isInterrupted() on each iteration of your for (String entryName : uploadTaskFutures.keySet()) loop, as you are not calling a method that throws InterruptedException on all code paths (e.g. if you keep going through the else path then it might be awhile before you notice that the interrupted flag is set)

Upvotes: 3

Related Questions