Reputation: 31
I am trying to use CompletableFuture to execute for
loop in parallel. And inside the loop I use supplyAsync
to call doSomething
to get output string and then put it in HashMap:
...
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
CompletableFuture<?> completableFuture = null;
for ( int i = 0; i < numberOfRecords; i++ ) {
final int finalI = i;
completableFuture = CompletableFuture
.supplyAsync( () -> doSomething( data, finalI ) )
.thenAccept( str -> map.put( finalI, str ) );
}
completableFuture.join();
private String doSomething(HashMap<String, String> data, int finalI ) ) {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
for ( int k = 0; k < data.size(); k++ ) {
//process data and add it in queue
}
String result = processQueueAndReturnString(queue);
return result;
The problem is when for
loop is almost done ( when i
is close to numberOfRecords
), another for
loop inside doSomething
method skips some iterations, e.g. if k=5
it can run loop only until k=2 or 3
and in this case supplyAsync( () -> doSomething( data, finalI ) )
returns null
. So it seems like my for
loop with CompletableFuture
finishes until some iterations are completely done.
Any suggestions or hints on how to fix that?
Upvotes: 0
Views: 4543
Reputation: 718708
So it seems like my
for
loop withCompletableFuture
finishes [before] some iterations are completely done.
Each loop iteration in your example code creates a CompletableFuture
. If you want to wait for all of the work to complete, you need to join all of them, not just the one created by the last iteration.
Something like this (style corrected!):
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
CompletableFuture<Void>[] futures = new CompletableFuture<Void>[nosRecords];
for (int i = 0; i < nosRecords; i++) {
final int finalI = i;
futures[i] = CompletableFuture
.supplyAsync(() -> doSomething(data, finalI))
.thenAccept(str -> map.put(finalI, str));
}
CompletableFuture.allOf(futures);
Note that you need to change CompletableFuture<?>
to CompletableFuture<Void>
because the declaration of allOf()
(javadoc) requires that. Fortunately, the thenAccept(...)
call is already returning a CompletableFuture<Void>
.
The
HashMap
data
is not thread-safe, should it be? I just use it in methoddoSomething
to get entry value based on indexfinalI
. I don't process thatHashMap
. I just read it.
There will be a happens before between the supplyAsync
call and the call to its lambda argument. So provided that data
does not change during the execution of any of the doSomething
calls, they will all see the correct values in the data
map.
Assuming things are as you said (and remain that way), it is OK to use an non-synchronized HashMap
there.
Upvotes: 2
Reputation: 338201
The Answer by Stephen C looks correct, and is appropriate for today's Java. But in the future (ahhh, see what I did there?), Java may offer a simpler and much faster approach, with virtual threads.
Project Loom is coming to Java, with preliminary builds available now built on early-access Java 16.
One major feature is virtual threads (fibers). These are lightweight threads. When the flow-of-control in any virtual thread blocks, Java detects the block and switches in another virtual thread to keep the CPU core busy. This can massively speed up threaded code that frequently blocks (as opposed to strictly CPU-bound tasks like video-encoding).
Note that according to Ron Pressler, one of the people working on Project Loom, the need for most of the many methods on CompletableFuture
evaporates with virtual threads. You will likely not do more than call get
. See his presentations, the latest being 2020-11-11, 2020-09-17, and 2020-07-28.
While I did not capture all the nuances of your business logic, I think I got the gist of it. Similar to Stephen C, I collect all the returned CompletableFuture
objects. Then I inspect them to see if they completed successfully or not.
In Project Loom, the ExecutorService
is now AutoCloseable
. So we can use try-with-resources syntax. The end of your try-block will block until all the submitted tasks are done. This natural blocking replaces the CompletableFuture.allOf(futures);
seen in the solution by Stephen C.
Here is a class for our task, a Callable
that returns a UUID
object. We also sleep each task for a second, to demonstrate a lengthy task. Our task also records its results in a ConcurrentMap
we pass to its constructor.
package work.basil.example;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
public class DoSomething implements Callable < UUID >
{
private Integer identifier;
private ConcurrentMap < Integer, UUID > results;
// Constructor
public DoSomething ( Integer identifier , ConcurrentMap < Integer, UUID > resultsMap )
{
this.identifier = identifier;
this.results = resultsMap;
}
@Override
public UUID call ( ) throws Exception
{
Thread.sleep( Duration.ofSeconds( 1 ) );
UUID uuid = UUID.randomUUID();
this.results.put( this.identifier , uuid );
return uuid;
}
}
Here is the code to instantiate and run a bunch of those tasks.
public static void main ( String[] args )
{
System.out.println( "INFO - Java version: " + Runtime.version() );
System.out.println( "INFO - Host OS: " + System.getProperty( "os.name" ) + " version " + System.getProperty( "os.version" ) );
System.out.println( "INFO - arch: " + System.getProperty( "os.arch" ) + " | Available processors (cores): " + Runtime.getRuntime().availableProcessors() );
long maxMemory = Runtime.getRuntime().maxMemory();
System.out.println( "INFO - Maximum memory (bytes): " + String.format( Locale.getDefault() , "%,d" , ( maxMemory == Long.MAX_VALUE ? "no limit" : maxMemory ) ) );
System.out.println( "----------------------------------------------" );
long start = System.nanoTime();
ConcurrentMap < Integer, UUID > results = new ConcurrentSkipListMap <>();
int countTasks = 1_000_000;
System.out.println( "INFO - Starting a run of " + countTasks + ". " + Instant.now() );
List < CompletableFuture < UUID > > futures = new ArrayList <>( countTasks );
try (
ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
for ( int nthTask = 0 ; nthTask < countTasks ; nthTask++ )
{
executorService.submit( new DoSomething( nthTask , results ) );
}
}
// At this point, flow-of-control blocks until all submitted tasks finish (are done, or are cancelled).
List < CompletableFuture < UUID > > canceled = new ArrayList <>();
List < CompletableFuture < UUID > > completedExceptionally = new ArrayList <>();
for ( CompletableFuture < UUID > future : futures )
{
if ( future.isCancelled() )
{
canceled.add( future );
} else if ( future.isCompletedExceptionally() )
{
completedExceptionally.add( future );
} else if ( ! future.isDone() )
{
throw new IllegalStateException( "All tasks should be done at this point, normally or interrupted." );
} else
{
throw new IllegalStateException( "Should not be able to reach this point." );
}
}
Duration duration = Duration.ofNanos( System.nanoTime() - start );
System.out.println( "Done at " + Instant.now() + ". Took: " + duration );
System.out.println( "Problems… canceled size: " + canceled.size() + " | completedExceptionally size: " + completedExceptionally.size() );
System.out.println( "Results size = " + String.format( Locale.getDefault() , "%,d" , results.size() ) );
}
INFO - Java version: 16-loom+9-316
INFO - Host OS: Mac OS X version 10.14.6
INFO - arch: x86_64 | Available processors (cores): 6
INFO - Maximum memory (bytes): 8,589,934,592
----------------------------------------------
INFO - Starting a run of 10000000. 2021-01-01T05:40:28.564019Z
Done at 2021-01-01T05:41:11.567852Z. Took: PT43.006895236S
Problems… canceled size: 0 | completedExceptionally size: 0
Results size = 10,000,000
Running a million of those tasks takes several seconds. Running ten million takes less than a minute.
So you can see how the blocked threads sleeping for a second are obviously not taking up time on a core. If they were taking time on the cores, we'd be waiting a long while: 10,000,000 tasks * 1 second each / 6 cores = 1,666,666 seconds = 462 hours.
Upvotes: 1