Harisingh Rajput
Harisingh Rajput

Reputation: 33

CompletableFuture implementation with dynamic multiple threads not working

Regarding CompletableFuture functionality, if anyone has idea about it is not creating multiple threads dynamically. For that i have tried with executorService also in below code but executorService has a fixed thread pool so it goes to blocking state. Can you please help to implement CompletableFuture in multithreading dynamically as per below code?

private static CompletableFuture<Integer> createCompletableFuture(ByteArrayOutputStream baOS, int totalBytes, 
            List<FileUploadMultiLocator> fileUploadList) {
        CompletableFuture<Integer> futureCount = CompletableFuture.supplyAsync(
                () -> {
                    try {
                            // simulate long running task
                            for (FileUploadMultiLocator fileUploadMultiLocator : fileUploadList) {
                                System.out.println(Thread.currentThread().getName() + " secondary task is called");
                                fileUploadMultiLocator.baOS.write(baOS.toByteArray(), 0, totalBytes);
                                fileUploadMultiLocator.setTotalBytes(totalBytes);
                                new Thread(fileUploadMultiLocator).start();
                                try {
                                    Thread.sleep(5);
                                } catch (InterruptedException e) {
                                    // TODO Auto-generated catch block
                                    e.printStackTrace();
                                }
                            }
                        }
                    catch (Exception e) { }
                    return 20;
                });
        return futureCount;
    }

Upvotes: 0

Views: 2700

Answers (1)

amanin
amanin

Reputation: 4184

I think you should make things simpler. Instead of creating N threads in a single asynchronous task (that's what a completable future is), I think you should loop over your list in the main thread, and for each element, create a completableFuture, giving it your "FileUploadMultiLocator" runnable. Each completable future is an asynchronous task in the Java common fork-join pool.

However, if you want control over the number of threads used for your tasks, you can use CompletableFuture.runAsync(Runnable r, Executor e). This way, you can prepare a thread pool with wanted number of Threads (using static methods in Executors class, and use it for your work.

Note that you have other tools at your disposal in java. You could also choose one of the 2 solutions below:

Java 8 Streams

if you can use Java 8, you should rather use Stream power. You can read more about it here.

In summary, the streams allow you parallel browsing/processing of a collection of objects. For your example, I would use the following approach:

        // Specify the wanted number of parallel tasks, and create a thread pool accordingly.
    final int threadNb = 4;
    final ForkJoinPool pool = new ForkJoinPool(threadNb);
    // We give the entire procedure to the thread pool, which will be in charge of managing executions
    pool.submit(
        // Ask to execute each runnable in the list, in a parallel way.
        () -> fileUploadList.parallelStream().forEach(r -> r.run())
    );

    // Below block is in charge of waiting complete execution.
    pool.shutdown();
    try {
        pool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException ex) {
        // Do whatever cancelling logic you want, or let it propagate
    }

Executor service

However, you are not forced to use streams, and you can work directly with an executor service. Why you should use this approach instead of creating threads yourself is described in the following answer :

        final List<Runnable> fileUploadList = null;
    // Specify the wanted number of parallel tasks.
    final int threadNb = 4;
    ExecutorService threadPool = Executors.newFixedThreadPool(threadNb);
    for (final Runnable r : fileUploadList) {
        // ... Do your pre-computing here ...
        // Now, submit asynchronous part of your task for execution
        threadPool.submit(r);
    }

    // Below block is in charge of waiting complete execution.
    threadPool.shutdown();
    try {
        threadPool.awaitTermination(1, TimeUnit.MINUTES);
    } catch (InterruptedException ex) {
        // Do whatever cancelling logic you want, or let it propagate
    }

Also, note that here, I create a on-shot executor service. But, the common thing to do is to create it somewhere in your app, and keep it alive, so you can re-use its threads multiple times. In this case, the code I give you is wrong for the 'waiting completion' part, and you should monitor each task future instead.

One last thing: my examples are really simplified use-cases. I manage neither error nor timeout by task, so it needs refining before use. However, I hope it helps !

Upvotes: 1

Related Questions