Reputation: 242
I am using Java 8 and I have a chain of CompletionStage that I am trying to run.
I don't want to use join()
or get()
, I want to explicity complete the CompletionStage
.
I am trying to run two database queries, the second has dependency on the result of the first query. I am starting a database transaction using session, running write query1, write query2 and only if both are successful I want to commit the transaction or else roll it back. The transaction and session are part of Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-
After running both queries success/failure I want to close the session(a standard database practice)
Here is psuedo code -
DB Session starts transaction
run Write Query1
run Write Query2
if both are successful
commit transaction
else
rollback transaction
close session
What I want to achieve is if query1/query2 fails then it should just rollback transaction and close session.
Query 1 can also throw a CustomException
if the result from Query1 is incorrect(less than some threshold). In this case it should rollback transaction. I am rolling back transaction in the exceptionally
block for each query.
The happy path works fine in the code below, but when I want to throw CustomException
, the Query2 block is not called and even the Completable.allOf
is never called.
CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();
//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {
//Write Query1
transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//throw CustomException if value less then threshold
if(val < threshold){
throw new CustomException("Incorrect value found");
}else{
//if value is correct then complete future
firstFuture.complete(val);
}
firstQuery.complete(val);
}).exceptionally(error -> {
//Since failure occured in Query1 want to roll back
transaction.rollbackAsync();
firstFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in first query " + error.getMessage());
});
//after the first write query is done then run the second write query
firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
.thenCompose(someFunctionThatReturnsCompletionStage)
.thenApply(val -> {
//if value is correct then complete
secondFuture.complete(val);
}
}).exceptionally(error -> {
//Incase of failure in Query2 want to roll back
transaction.rollbackAsync();
secondFuture.completeExceptionally(error);
throw new RuntimeException("There has been an error in second query " + error.getMessage());
});
//wait for both to complete and then complete the last future
CompletableFuture.allOf(firstFuture, secondFuture)
.handle((empty, ex) -> {
if(ex != null){
lastFuture.completeExceptionally(ex);
}else{
//commit the transaction
transaction.commitAsync();
lastFuture.complete("OK");
}
return lastFuture;
});
return lastFuture;
}
//Create a database session
Session session = driver.session();
//runTransactionWork is lambda that has access to transaction
session.writeTransactionAsync(runTransactionWork)
.handle((val, err) -> {
if(val != null){
session.closeAsync();
//send message to some broker about success
}else{
//fail logic
}
});
How can I achieve short circuiting the exception to ensure the transaction is rolled back and it directly goes to exception block on session.
These are my observations about the code blocks that are called based on different use cases, note these are based on debug points that I have placed in the code -
I want #2 and #3 to work as well and the respective transaction should be rolled back and session should be closed.
My question is if is why does the exeption part from handle
of allOf
does not get called when one of the future completesExceptionally
?
Upvotes: 1
Views: 879
Reputation: 120848
When you throw that CustomException
, firstFuture
is not completed. As a matter of fact, nothing happens to it. Because it is not completed (successfully), this:
firstFuture.thenCompose...
will not be executed. The documentation of thenCompose
says:
When this stage completes normally, the given function is invoked with this stage's result as the argument...
Since this is not the case, that code is obviously not going to be triggered. Because of that, nothing in turn happens to secondFuture
either, so CompletableFuture::allOf
has to do exactly zero. May be a simplified example will help:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
System.out.println(one.isCompletedExceptionally());
CompletableFuture<Void> two = one.thenRun(CF::db2);
System.out.println("first is done : " + FIRST_FUTURE.isDone());
System.out.println("second is done : " + SECOND_FUTURE.isDone());
CompletableFuture.allOf(FIRST_FUTURE, SECOND_FUTURE).thenRun(() -> {
System.out.println("allOf");
});
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
}
private static final boolean FAIL = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL) {
throw new RuntimeException("failed one");
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
System.out.println("Running");
SECOND_FUTURE.complete("42");
}
}
If you run this, you will notice that nothing gets printed...
Unfortunately I am not familiar with Neo4j
, but you can most probably adjust this example to your needs:
public class CF {
public static void main(String[] args) {
CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
CompletableFuture<Void> terminal =
one.whenComplete((ok, th) -> {
if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
// no need to schedule the second one, need to rollback whatever the first one did
// transaction.rollbackAsync();
System.out.println("rollback because first one failed");
LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
} else {
CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
two.whenComplete((ok2, th2) -> {
if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
System.out.println("rollback because second one failed");
// transaction.rollbackAsync();
LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
} else {
LAST_FUTURE.complete("OK");
}
});
}
});
// simulate that someone will call this
terminal.join();
System.out.println(LAST_FUTURE.join());
}
private static final boolean FAIL_ONE = false;
private static final boolean FAIL_TWO = true;
private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();
private static void db1() {
if(FAIL_ONE) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
FIRST_FUTURE.completeExceptionally(ex);
} else {
FIRST_FUTURE.complete("42");
}
}
private static void db2() {
if(FAIL_TWO) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
RuntimeException ex = new RuntimeException("failed one");;
SECOND_FUTURE.completeExceptionally(ex);
} else {
SECOND_FUTURE.complete("42");
}
}
}
Upvotes: 1