Prasanna Talakanti
Prasanna Talakanti

Reputation: 2394

Reactive Java OneError Resume error handling

Trying to save event has this flow ( the repo is reactive, this is just an example code for testing. I am new reactive, I am using io.projectreactor (3.3))

  1. Validate an event, on failure, write to history
  2. if validate is successful, write event to repo, any failures write to history
  3. if validate fails write to history
  4. inducing some failures to simulate the error condition
import reactor.core.publisher.Mono;

public class MyTest {

    static int counter = 0;


    public static void main(String args[]) throws InterruptedException
    {
        String array[] = {"1","2","3","4",null,"5"};
        for(int i =0; i < 5; i++)
        {
            System.out.println("input:: "+array[i]);
            new MyTest().createMessage(array[i]);
            counter++;
            Thread.sleep(500);
        }
    }
    private void createMessage(String input)
    {
        new MyTest().onMessage(input)
                .doOnSuccess(s -> System.out.println("----done::success-----"))
                .onErrorResume(e ->
                {System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
                return Mono.empty();})
                .subscribe();
    }

    private Mono<String> onMessage(String input)
    {
        return Mono.create(sink -> {
            validate()
                    .onErrorResume(e -> {
                        System.out.println("error onMessage:: fail to validate");
                        sink.error(e);
                        return Mono.error(e);
                    })
                    .flatMap(a -> processObject(input))
                    .flatMap(h -> {
                        System.out.println("success onMessage :: save history");
                        new Service().saveHistory(input, false);
                        sink.success();
                        return Mono.just(h);
                      })
                     .subscribe();
        });

    }

    private Mono<String> processObject(String input)
    {
           return Mono.create(sink -> {
               new Service().saveEvent(input).flatMap(a -> {
                   System.out.println("success processObject");
                   sink.success(a);
                   return Mono.just(a);
               }).onErrorResume(e -> {
                   new Service().saveHistory(input, true);
                   System.out.println("error processObject");
                   sink.error(e);
                   return Mono.error(e);
               }).subscribe();
           });

    }

    private Mono<String> validate()
    {
        counter++;
        return Mono.create(sink -> {
            if (counter % 3 == 0)
            {
                sink.error(new RuntimeException("Validate method error"));
                return;
            }
            sink.success("validate is done ");
            return;
        });

    }


}

Service Class

public class Service {


    public Mono<String> saveEvent(String id)
    {
        return save(id)
                .onErrorResume(e -> {
                    System.out.println("Error in save event");
                    return Mono.error(e);
                }).doOnNext(e -> System.out.println("save event"));

    }

    public Mono<String> saveHistory(String id, boolean error)
    {
        return save(id)
                .onErrorResume(e -> {
                    System.out.println("Error in save history");
                    return Mono.error(e);
                }).doOnNext(e -> System.out.println("save history"));

    }

    public Mono<String> save(String id)
    {

        if (id  == null)
        {
            throw new RuntimeException("Error saving");
        }

        return Mono.just("save success");
    }

}

I am getting this exception

---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Validate method error
Caused by: java.lang.RuntimeException: Validate method error
    at sample.MyTest.lambda$validate$9(MyTest.java:77)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
    at sample.MyTest.lambda$onMessage$5(MyTest.java:49)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
    at sample.MyTest.createMessage(MyTest.java:30)
    at sample.MyTest.main(MyTest.java:18)

Updated working code : based on @Michael Berry comments

 public static void main(String args[]) throws InterruptedException
    {
        String array[] = {"1","2","3","4",null,"5"};
        for(int i =0; i < 5; i++)
        {
            System.out.println("input:: "+array[i]);
            new MyTest().createMessage(array[i]);
            counter++;
            Thread.sleep(500);
        }
    }
    private void createMessage(String input)
    {
        new MyTest().onMessage(input)
                .doOnSuccess(s -> System.out.println("----done::success-----"))
                .onErrorResume(e ->
                {
                    System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
                   return Mono.empty();
                })
                .subscribe();
    }

    private Mono<String> onMessage(String input) {
        return validate()
                .onErrorResume(e -> {
                    System.out.println("error onMessage:: fail to validate");
                    return Mono.error(e);
                })
                .flatMap(a -> processObject(input))
                .flatMap(h -> {
                    System.out.println("success onMessage :: save history");
                    new Service().saveHistory(input, false);
                    return Mono.just(h);
                });
    }

    private Mono<String> processObject(String input)
    {
           return new Service().saveEvent(input).flatMap(a -> {
                   System.out.println("success processObject");
                   return Mono.just(a);
               }).onErrorResume(e -> {
                   new Service().saveHistory(input, true);
                   System.out.println("error processObject");
                    return Mono.error(e);
               });

    }

    private Mono<String> validate()
    {
        counter++;

            if (counter % 3 == 0)
            {
                return Mono.error(new RuntimeException("Validate method error"));

            }
          return  Mono.just("validate is done ");



    }

Result

save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error

Upvotes: 1

Views: 2653

Answers (1)

Michael Berry
Michael Berry

Reputation: 72294

You're getting an error here because of your onMessage() implementation, which is a bit bizarre:

  • You're wrapping a Mono in Mono.create(), which there's no reason to do;
  • You're subscribing on this inner publisher yourself - that's almost always the wrong thing to do, and won't necessarily do what you expect (subscribing to publishers should be handled by the framework, not your code.) In this case, the key thing is it means that it's treated separately, not part of your reactive chain, so your error handling probably isn't mapping to the inner publisher as you expect;
  • Your onErrorResume() call on this inner publisher itself returns an error, and there's no other error handling on this inner publisher - hence why that error is unhandled, so it then prints out the stack trace that you're seeing.

Instead, you most likely want your onMessage() method to read thus:

private Mono<String> onMessage(String input) {
    return validate()
            .onErrorResume(e -> {
                System.out.println("error onMessage:: fail to validate");
                return Mono.error(e);
            })
            .flatMap(a -> processObject(input))
            .flatMap(h -> {
                System.out.println("success onMessage :: save history");
                new Service().saveHistory(input, false);
                return Mono.just(h);
            });
}

...without the Mono.create() (which is only really meant to be used by non-reactor callback APIs for compatibility purposes.) Your output with this change then reads as follows:

input:: 1
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate     method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error

Upvotes: 1

Related Questions