Reputation: 115
I have an issue with onErrorContinue() that the object passed to the Biconsumer is null.
I'm using Spring boot 2.1.13.RELEASE and reactive mongo with reactor-core version 3.2.15.RELEASE.
The problem happens when a call to the database to retrieve a record with the id returns no record and using the operator switchIfEmtpy() i throw an exception using Mono.error() and downstream i try to handle this exception using onErrorContinue().
The code below explains the issue:
public static void main(String[] args) {
Flux.range(1, 10)
.flatMap(integer -> mapInteger(integer))
.doOnNext(System.out::println)
.onErrorContinue((throwable, o) -> System.out.println("error with " + o)) // o is null
.subscribe();
}
public static Mono<Integer> mapInteger(Integer num) { // This is here to simulate the db call
return Mono.just(num)
.flatMap(t -> {
if (t == 5)
return Mono.empty();
else
return Mono.just(num * 2);
})
.switchIfEmpty(Mono.error(new RuntimeException("Error happened while mapping integer!")));
}
This will print the below values:
2
4
6
8
error with null
12
14
16
18
20
PS. I don't have the issue when another error happens in the stream.
Update: The mapInteger() was to simulate the below call to reactivemongo repository:
public Mono<MetaData> getFromDbByKey(String key) {
return repository
.findByKeyAndDeletedIsFalse(key)
.switchIfEmpty(Mono.error(() -> new RuntimeException()));
}
and the call to getFromDbByKey() returns this MetaData which i need to map it to the flux in the main flow.
with onErrorContinue, we catch the Throwable and based on its type we do different handling to each type of error.
Upvotes: 0
Views: 2777
Reputation: 115
Here is a solution/workaround that was suggested to me by a colleague:
public static void main(String[] args) {
Flux.range(1, 10)
.flatMap(integer -> mapInteger(integer)
.switchIfEmpty(Mono.error(
new CustomException("Error happened while mapping integer!",
integer))))
.doOnNext(System.out::println)
.onErrorContinue((throwable, o) -> {
if (CustomException.class.isInstance(throwable))
System.out.println("Object to blame " + ((CustomException)throwable).getBlamedObject());
System.out.println("error with " + o);
})
.subscribe();
}
public static Mono<Integer> mapInteger(Integer num) {
return Mono.just(num)
.flatMap(t -> {
if (t == 5)
return Mono.empty();
else
return Mono.just(num * 2);
});
}
public static class CustomException extends RuntimeException {
private Integer blamedObject;
public CustomException(String message, Integer blamedObject) {
super(message);
this.blamedObject = blamedObject;
}
public Integer getBlamedObject() {
return blamedObject;
}
}
This will return:
2
4
6
8
Object to blame 5
error with null
12
14
16
18
20
The solution/workaround is to create a custom exception that accepts the object with the error and move the switchIfEmpty() to the main flow so that i can get the object that caused the problem and later in the onErrorContinue() i can check the exception type and get the object that caused the problem.
This works for me as the mapInteger() in the example is just a simulation to a reactive mongo repository which will return empty mono if there is no result found as updated in the original post.
Upvotes: 0
Reputation: 3671
You cant expect to receive the "item" t when you are returning Mono.empty() if t==5 ?
This code will print what you need.
Flux.range(1, 10)
.flatMap(integer -> mapInteger(integer))
.doOnNext(System.out::println)
.onErrorContinue((throwable, o) -> {
System.out.println(throwable.getMessage());
})
.subscribe();
}
public static Mono<Integer> mapInteger(Integer num) { // This is here to simulate the db call
return Mono.just(num)
.flatMap(t -> {
if (t == 5)
return Mono.empty();
else
return Mono.just(num * 2);
})
.switchIfEmpty(Mono.error(new RuntimeException("error with " + num)));
}
prints:
2
4
6
8
error with 5
12
14
16
18
20
And in your actual mongoDB call, you can have something like:
public Mono<MetaData> getFromDbByKey(String key) {
return repository
.findByKeyAndDeletedIsFalse(key)
.switchIfEmpty(Mono.error(() -> new RuntimeException("Couldnt find metadata which is not deleted for the key: " + key)));
}
Upvotes: 1