Reputation: 2394
Trying to save event has this flow ( the repo is reactive, this is just an example code for testing. I am new reactive, I am using io.projectreactor (3.3))
import reactor.core.publisher.Mono;
public class MyTest {
static int counter = 0;
public static void main(String args[]) throws InterruptedException
{
String array[] = {"1","2","3","4",null,"5"};
for(int i =0; i < 5; i++)
{
System.out.println("input:: "+array[i]);
new MyTest().createMessage(array[i]);
counter++;
Thread.sleep(500);
}
}
private void createMessage(String input)
{
new MyTest().onMessage(input)
.doOnSuccess(s -> System.out.println("----done::success-----"))
.onErrorResume(e ->
{System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
return Mono.empty();})
.subscribe();
}
private Mono<String> onMessage(String input)
{
return Mono.create(sink -> {
validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
sink.error(e);
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
sink.success();
return Mono.just(h);
})
.subscribe();
});
}
private Mono<String> processObject(String input)
{
return Mono.create(sink -> {
new Service().saveEvent(input).flatMap(a -> {
System.out.println("success processObject");
sink.success(a);
return Mono.just(a);
}).onErrorResume(e -> {
new Service().saveHistory(input, true);
System.out.println("error processObject");
sink.error(e);
return Mono.error(e);
}).subscribe();
});
}
private Mono<String> validate()
{
counter++;
return Mono.create(sink -> {
if (counter % 3 == 0)
{
sink.error(new RuntimeException("Validate method error"));
return;
}
sink.success("validate is done ");
return;
});
}
}
Service Class
public class Service {
public Mono<String> saveEvent(String id)
{
return save(id)
.onErrorResume(e -> {
System.out.println("Error in save event");
return Mono.error(e);
}).doOnNext(e -> System.out.println("save event"));
}
public Mono<String> saveHistory(String id, boolean error)
{
return save(id)
.onErrorResume(e -> {
System.out.println("Error in save history");
return Mono.error(e);
}).doOnNext(e -> System.out.println("save history"));
}
public Mono<String> save(String id)
{
if (id == null)
{
throw new RuntimeException("Error saving");
}
return Mono.just("save success");
}
}
I am getting this exception
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Validate method error
Caused by: java.lang.RuntimeException: Validate method error
at sample.MyTest.lambda$validate$9(MyTest.java:77)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
at sample.MyTest.lambda$onMessage$5(MyTest.java:49)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
at sample.MyTest.createMessage(MyTest.java:30)
at sample.MyTest.main(MyTest.java:18)
Updated working code : based on @Michael Berry comments
public static void main(String args[]) throws InterruptedException
{
String array[] = {"1","2","3","4",null,"5"};
for(int i =0; i < 5; i++)
{
System.out.println("input:: "+array[i]);
new MyTest().createMessage(array[i]);
counter++;
Thread.sleep(500);
}
}
private void createMessage(String input)
{
new MyTest().onMessage(input)
.doOnSuccess(s -> System.out.println("----done::success-----"))
.onErrorResume(e ->
{
System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
return Mono.empty();
})
.subscribe();
}
private Mono<String> onMessage(String input) {
return validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
return Mono.just(h);
});
}
private Mono<String> processObject(String input)
{
return new Service().saveEvent(input).flatMap(a -> {
System.out.println("success processObject");
return Mono.just(a);
}).onErrorResume(e -> {
new Service().saveHistory(input, true);
System.out.println("error processObject");
return Mono.error(e);
});
}
private Mono<String> validate()
{
counter++;
if (counter % 3 == 0)
{
return Mono.error(new RuntimeException("Validate method error"));
}
return Mono.just("validate is done ");
}
Result
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Upvotes: 1
Views: 2653
Reputation: 72294
You're getting an error here because of your onMessage()
implementation, which is a bit bizarre:
Mono
in Mono.create()
, which there's no reason to do;onErrorResume()
call on this inner publisher itself returns an error, and there's no other error handling on this inner publisher - hence why that error is unhandled, so it then prints out the stack trace that you're seeing.Instead, you most likely want your onMessage()
method to read thus:
private Mono<String> onMessage(String input) {
return validate()
.onErrorResume(e -> {
System.out.println("error onMessage:: fail to validate");
return Mono.error(e);
})
.flatMap(a -> processObject(input))
.flatMap(h -> {
System.out.println("success onMessage :: save history");
new Service().saveHistory(input, false);
return Mono.just(h);
});
}
...without the Mono.create()
(which is only really meant to be used by non-reactor callback APIs for compatibility purposes.) Your output with this change then reads as follows:
input:: 1
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Upvotes: 1