Reputation: 4716
I'm struggling with the functional style of Supplier<U>
, etc and creating testable code.
So I have an InputStream
that is split into chunks which are processed asynchronously, and I want to know when they are all done. To write testable code, I outsource the processing logic to its own Runnable
:
public class StreamProcessor {
public CompletableFuture<Void> process(InputStream in) {
List<CompletableFuture> futures = new ArrayList<>();
while (true) {
try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
byte[] data = IOUtils.toByteArray(chunkStream);
CompletableFuture<Void> f = CompletableFuture.runAsync(createTask(data));
futures.add(f);
} catch (EOFException ex) {
// end of stream reached
break;
} catch (IOException ex) {
return CompletableFuture.failedFuture(ex);
}
}
return CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new));
}
ChunkTask createTask(byte[] data) {
return new ChunkTask(data);
}
public class ChunkTask implements Runnable {
final byte[] data;
ChunkTask(byte[] data) {
this.data = data;
}
@Override
public void run() {
try {
// do something
} catch (Exception ex) {
// checked exceptions must be wrapped
throw new RuntimeException(ex);
}
}
}
}
This works well, but poses two problems:
Runnable
after all.ChunkTask.run()
must be wrapped into a RuntimeException
. Unwrapping the failed combined CompletableFuture
returns the RuntimeException
which needs to be unwrapped again to reach the original cause - in contrast to the IOException
.So I'm looking for a way to do this with CompletableFuture.supplyAsync()
, but I can't figure out how to do this without lambdas (bad to test) or to return a CompletableFuture.failedFuture()
from the processing logic.
Upvotes: 2
Views: 1197
Reputation: 8246
I can think of two approaches:
1. With supplyAsync
:
When using CompletableFuture.supplyAsync
, you need a supplier instead of a runnable:
public static class ChunkTask implements Supplier<Object> {
final byte[] data;
ChunkTask(byte[] data) {
this.data = data;
}
@Override
public Object get() {
Object result = ...;
// Do something or throw an exception
return result;
}
}
and then:
CompletableFuture
.supplyAsync( new ChunkTask( data ) )
.whenComplete( (result, throwable) -> ... );
If an exception happens in Supplier.get()
, it will b e propagated and you can see it in CompletableFuture.whenComplete
, CompletableFuture.handle
or CompletableFuture.exceptionally
.
2. Passing a CompletableFuture
to the thread
You can pass a CompletableFuture
to ChunkTask
:
public class ChunkTask implements Runnable {
final byte[] data;
private final CompletableFuture<Object> future;
ChunkTask(byte[] data, CompletableFuture<Object> future) {
this.data = data;
this.future = future;
}
@Override
public void run() {
try {
Object result = null;
// do something
future.complete( result );
} catch (Throwable ex) {
future.completeExceptionally( ex );
}
}
}
Then the logic becomes:
while (true) {
CompletableFuture<Object> f = new CompletableFuture<>();
try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
byte[] data = IOUtils.toByteArray(chunkStream);
startThread(new ChunkTask(data, f));
futures.add(f);
} catch (EOFException ex) {
// end of stream reached
break;
} catch (IOException ex) {
f.completeExceptionally( ex );
return f;
}
}
Probably, Number 2 is the one that gives you more flexibility on how to manage the exception.
Upvotes: 2