Reputation: 38132
I'm trying to filter exceptions from Flux/ Mono with onErrorContinue
, but it never gets triggered.
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class FluxOnErrorContinueTest {
@Test
public void fluxTest() {
Flux<String> fluxOne = Flux.just("one")
.onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("fluxOne: " + ex.getMessage()));
Flux<String> fluxTwo = Flux.<String>error(() -> new IllegalArgumentException("error"))
.onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("fluxTwo: " +ex.getMessage()));
Flux<String> fluxThree = Flux.just("three", "four", "five")
.onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("fluxThree: " +ex.getMessage()));
Flux<String> sourceFlux = fluxOne.concatWith(fluxTwo).concatWith(fluxThree);
Flux<String> filteredFlux = sourceFlux
.onErrorContinue(IllegalArgumentException.class, (ex, value) -> System.out.println("filteredFlux: " + ex.getMessage()));
StepVerifier.create(filteredFlux)
.expectNextCount(4)
.verifyComplete();
}
}
I would expect to get the four string and a log on the console with the exception message, but I get:
12:15:15.191 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onError(java.lang.IllegalArgumentException: error))
at reactor.test.ErrorFormatter.assertionError(ErrorFormatter.java:105)
at reactor.test.ErrorFormatter.failPrefix(ErrorFormatter.java:94)
at reactor.test.ErrorFormatter.fail(ErrorFormatter.java:64)
at reactor.test.ErrorFormatter.failOptional(ErrorFormatter.java:79)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.checkCountMismatch(DefaultStepVerifierBuilder.java:1258)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignalCount(DefaultStepVerifierBuilder.java:1489)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1341)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1030)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onError(FluxContextStart.java:117)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1748)
at reactor.core.publisher.Operators.error(Operators.java:181)
at reactor.core.publisher.FluxErrorSupplied.subscribe(FluxErrorSupplied.java:61)
at reactor.core.publisher.Flux.subscribe(Flux.java:7921)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:207)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
at reactor.core.publisher.FluxContextStart.subscribe(FluxContextStart.java:49)
at reactor.core.publisher.Flux.subscribe(Flux.java:7921)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:801)
at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:772)
at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:644)
at com.myorg.FluxOnErrorContinueTest.fluxTest(FluxOnErrorContinueTest.java:21)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Suppressed: java.lang.IllegalArgumentException: error
at com.myorg.FluxOnErrorContinueTest.lambda$fluxTest$0(FluxOnErrorContinueTest.java:14)
at reactor.core.publisher.FluxErrorSupplied.subscribe(FluxErrorSupplied.java:60)
... 48 more
How can I filter exceptions (try-catch equivalent)?
I also tried with Mono and responses from WebClient.
In the Javadoc "Error Mode Support javadoc section" gets mentioned, but I couldn't find that section. Do you have any links?
Upvotes: 1
Views: 1914
Reputation: 4365
The problem here is that onErrorContinue
in the test is applied to whole sourceFlux
. Probably, what you need - is recover from failure for each Publisher
that is concatWith
using onErrorResume
operator:
@Test
public void fluxTest() {
Function<Throwable, Publisher<String>> fallback = error -> {
if (error instanceof IllegalArgumentException) {
System.out.println(error.getMessage());
return Flux.empty();
} else {
return Flux.error(error);
}
};
Flux<String> sourceFlux = Flux.just("one")
.concatWith(Flux.<String>error(() -> new IllegalArgumentException("error"))
.onErrorResume(fallback))
.concatWith(Flux.just("three", "four", "five")
.onErrorResume(fallback));
StepVerifier.create(sourceFlux)
.expectNextCount(4)
.verifyComplete();
}
Or, if you need to handle only IllegalArgumentException
:
@Test
public void fluxTest() {
Function<Throwable, Publisher<String>> fallback = error -> {
System.out.println(error.getMessage());
return Flux.empty();
};
Flux<String> sourceFlux = Flux.just("one")
.concatWith(Flux.<String>error(() -> new IllegalArgumentException("error"))
.onErrorResume(IllegalArgumentException.class, fallback))
.concatWith(Flux.just("three", "four", "five")
.onErrorResume(IllegalArgumentException.class, fallback));
StepVerifier.create(sourceFlux)
.expectNextCount(4)
.verifyComplete();
}
Upvotes: 2