cheddarDev
cheddarDev

Reputation: 242

Short circuiting the chain of CompletionStage

I am using Java 8 and I have a chain of CompletionStage that I am trying to run.

I don't want to use join() or get(), I want to explicity complete the CompletionStage.

I am trying to run two database queries, the second has dependency on the result of the first query. I am starting a database transaction using session, running write query1, write query2 and only if both are successful I want to commit the transaction or else roll it back. The transaction and session are part of Neo4j java API https://neo4j.com/docs/api/java-driver/current/org/neo4j/driver/async/AsyncSession.html#writeTransactionAsync-org.neo4j.driver.async.AsyncTransactionWork-

After running both queries success/failure I want to close the session(a standard database practice)

Here is psuedo code -

DB Session starts transaction
    run Write Query1
    run Write Query2
    
    if both are successful
       commit transaction
    else
       rollback transaction
close session

What I want to achieve is if query1/query2 fails then it should just rollback transaction and close session.

Query 1 can also throw a CustomException if the result from Query1 is incorrect(less than some threshold). In this case it should rollback transaction. I am rolling back transaction in the exceptionally block for each query.

The happy path works fine in the code below, but when I want to throw CustomException, the Query2 block is not called and even the Completable.allOf is never called.

CompletableFuture<String> firstFuture = new CompletableFuture();
CompletableFuture<String> secondFuture = new CompletableFuture();
CompletableFuture<String> lastFuture = new CompletableFuture();


//Lambda that executes transaction
TransactionWork<CompletionStage<String>> runTransactionWork = transaction -> {

     //Write Query1
       transaction.runAsync("DB WRITE QUERY1") //Running Write Query 1
              .thenCompose(someFunctionThatReturnsCompletionStage)
              .thenApply(val -> {
                     //throw CustomException if value less then threshold
                     if(val < threshold){
                         throw new CustomException("Incorrect value found");
                     }else{
                       //if value is correct then complete future
                       firstFuture.complete(val);
                     }
                  firstQuery.complete(val);
              }).exceptionally(error -> {
                        //Since failure occured in Query1 want to roll back
                        transaction.rollbackAsync();
                        firstFuture.completeExceptionally(error);
                        throw new RuntimeException("There has been an error in first query " + error.getMessage());
                  });

         //after the first write query is done then run the second write query
         firstFuture.thenCompose(val -> transaction.runAsync("DB Write QUERY 2"))
                   .thenCompose(someFunctionThatReturnsCompletionStage)
                   .thenApply(val -> {                      
                       //if value is correct then complete
                       secondFuture.complete(val);
                     }
                   }).exceptionally(error -> {
                        //Incase of failure in Query2 want to roll back
                        transaction.rollbackAsync();
                        secondFuture.completeExceptionally(error);
                        throw new RuntimeException("There has been an error in second query " + error.getMessage());
                  });


   //wait for both to complete and then complete the last future
   CompletableFuture.allOf(firstFuture, secondFuture)
                    .handle((empty, ex) -> {
                        if(ex != null){
                            lastFuture.completeExceptionally(ex);
                        }else{
                            //commit the transaction
                            transaction.commitAsync();
                            lastFuture.complete("OK");
                        }

                        return lastFuture;
                    });

            return lastFuture;
}

 //Create a database session
 Session session = driver.session();

 //runTransactionWork is lambda that has access to transaction
 session.writeTransactionAsync(runTransactionWork)
      .handle((val, err) -> {
         if(val != null){
            session.closeAsync();
            //send message to some broker about success
         }else{
            //fail logic 
         }
      });


How can I achieve short circuiting the exception to ensure the transaction is rolled back and it directly goes to exception block on session.

These are my observations about the code blocks that are called based on different use cases, note these are based on debug points that I have placed in the code -

  1. Happy path - firstFuture(success) -> secondFuture(success) -> LastFuture (success) -> session block success called (works fine)
  2. First Future fail - firstFuture(failed due to exception) -> secondFuture(never called) -> LastFuture(never called) -> session block failure(never called)
  3. Second Future fail - firstFuture(success) -> secondFuture(failed due to exception) -> LastFuture(never called) -> session block failure(never called)

I want #2 and #3 to work as well and the respective transaction should be rolled back and session should be closed.

My question is if is why does the exeption part from handle of allOf does not get called when one of the future completesExceptionally ?

Upvotes: 1

Views: 879

Answers (1)

Eugene
Eugene

Reputation: 120848

When you throw that CustomException, firstFuture is not completed. As a matter of fact, nothing happens to it. Because it is not completed (successfully), this:

firstFuture.thenCompose...

will not be executed. The documentation of thenCompose says:

When this stage completes normally, the given function is invoked with this stage's result as the argument...

Since this is not the case, that code is obviously not going to be triggered. Because of that, nothing in turn happens to secondFuture either, so CompletableFuture::allOf has to do exactly zero. May be a simplified example will help:

public class CF {

  public static void main(String[] args) {
    CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
    System.out.println(one.isCompletedExceptionally());

    CompletableFuture<Void> two = one.thenRun(CF::db2);

    System.out.println("first is done : " + FIRST_FUTURE.isDone());
    System.out.println("second is done : " + SECOND_FUTURE.isDone());
    CompletableFuture.allOf(FIRST_FUTURE, SECOND_FUTURE).thenRun(() -> {
      System.out.println("allOf");
    });
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
  }

  private static final boolean FAIL = true;
  private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();

  private static void db1() {
    if(FAIL) {
      throw new RuntimeException("failed one");
    } else {
      FIRST_FUTURE.complete("42");
    }
  }

  private static void db2() {
    System.out.println("Running");
    SECOND_FUTURE.complete("42");
  }

}

If you run this, you will notice that nothing gets printed...


Unfortunately I am not familiar with Neo4j, but you can most probably adjust this example to your needs:

public class CF {

  public static void main(String[] args) {
    CompletableFuture<Void> one = CompletableFuture.runAsync(CF::db1);

    CompletableFuture<Void> terminal =
    one.whenComplete((ok, th) -> {
      if(th != null || FIRST_FUTURE.isCompletedExceptionally()) {
        // no need to schedule the second one, need to rollback whatever the first one did
        // transaction.rollbackAsync();
        System.out.println("rollback because first one failed");
        LAST_FUTURE.completeExceptionally(new RuntimeException("because first one failed"));
      } else {
        CompletableFuture<Void> two = CompletableFuture.runAsync(CF::db2);
        two.whenComplete((ok2, th2) -> {
          if(th2 != null || SECOND_FUTURE.isCompletedExceptionally()) {
            System.out.println("rollback because second one failed");
            // transaction.rollbackAsync();
            LAST_FUTURE.completeExceptionally(new RuntimeException("because second one failed"));
          } else {
            LAST_FUTURE.complete("OK");
          }
        });
      }
    });

    // simulate that someone will call this
    terminal.join();
    System.out.println(LAST_FUTURE.join());

  }

  private static final boolean FAIL_ONE = false;
  private static final boolean FAIL_TWO = true;
  private static final CompletableFuture<String> FIRST_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> SECOND_FUTURE = new CompletableFuture<>();
  private static final CompletableFuture<String> LAST_FUTURE = new CompletableFuture<>();

  private static void db1() {
    if(FAIL_ONE) {
      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
      RuntimeException ex = new RuntimeException("failed one");;
      FIRST_FUTURE.completeExceptionally(ex);
    } else {
      FIRST_FUTURE.complete("42");
    }
  }

  private static void db2() {
    if(FAIL_TWO) {
      LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500));
      RuntimeException ex = new RuntimeException("failed one");;
      SECOND_FUTURE.completeExceptionally(ex);
    } else {
      SECOND_FUTURE.complete("42");
    }
  }

}

Upvotes: 1

Related Questions