Reputation: 496
I'm struggling to understand the correct behaviour needed to implement a robust solution for PublishSubject in RxJava. I've read in various places that onError... is the correct solution, but I've not twigged where to place the code for this to be handled. Please see a simplified example as part of a JUnit test. The tests that highlight the issues are shouldHandleExceptionsForMultipleSubsSuccessfulOnesContinuing and shouldHandleErrorSuccessfulCallForRetryVersionWithSuccessfulOnesContinuing which show that all processing for the bus stops after an exception is encountered.
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestClientException;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
@RunWith(MockitoJUnitRunner.class)
public class TestSubPubRobustness {
private static final Integer INTEGER_VALUE_OF_14 = Integer.valueOf(14);
private static final Integer INTEGER_VALUE_OF_12 = Integer.valueOf(12);
@Mock
private ValidatoryInterface mockInterface;
private static final Logger logger = LoggerFactory.getLogger(TestSubPubRobustness.class);
PublishSubject<Integer> subject;
Subject<Integer, Integer> inboundMessageBus;
@Before
public void setUp() throws Exception {
subject = PublishSubject.create();
inboundMessageBus = new SerializedSubject<>(subject);
}
@Test
public void shouldHandleSimplySuccessfulCall() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.subscribe(eventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleMultipleSuccessfulCall() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.subscribe(eventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleSimplySuccessfulCallForRetryVersion() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.retry().subscribe(eventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleErrorSuccessfulCallForRetryVersion() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.retry().subscribe(eventHandler);
doThrow(new RuntimeException()).when(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleErrorSuccessfulCallForRetryVersionWithSuccessfulOnesContinuing() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.onExceptionResumeNext(Observable.empty())
.subscribe(eventHandler);
doThrow(new RestClientException("error")).when(mockInterface)
.haveBeenCalled(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleMultipleSubsSuccessfulOnesContinuing() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
final TestEventHandler additionalEventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.retry().subscribe(eventHandler);
inboundMessageBus.retry().subscribe(additionalEventHandler);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
@Test
public void shouldHandleExceptionsForMultipleSubsSuccessfulOnesContinuing() {
final TestEventHandler eventHandler = new TestEventHandler(mockInterface);
final TestEventHandler additionalEventHandler = new TestEventHandler(mockInterface);
inboundMessageBus.asObservable()
.onErrorReturn(error -> Integer.MAX_VALUE)
.retry()
.subscribe(eventHandler);
inboundMessageBus.asObservable()
.onErrorReturn(error -> Integer.MAX_VALUE)
.retry()
.subscribe(additionalEventHandler);
doThrow(new RuntimeException()).when(mockInterface).haveBeenCalled(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_12);
inboundMessageBus.onNext(INTEGER_VALUE_OF_14);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_12);
verify(mockInterface, times(2)).haveBeenCalled(INTEGER_VALUE_OF_14);
verifyNoMoreInteractions(mockInterface);
}
private final class TestEventHandler extends Subscriber<Integer> {
private final ValidatoryInterface validatoryInterface;
public TestEventHandler(final ValidatoryInterface validatoryInterface) {
this.validatoryInterface = validatoryInterface;
}
@Override
public void onCompleted() {
logger.debug("Completed");
}
@Override
public void onError(final Throwable e) {
logger.error("Argggggggg", e);
}
@Override
public void onNext(final Integer t) {
logger.debug("Next", t);
validatoryInterface.haveBeenCalled(t);
}
}
private interface ValidatoryInterface {
void haveBeenCalled(Integer testNumber);
}
}
Upvotes: 1
Views: 164
Reputation: 18888
AFAIK you cannot resume an Observable once in an error state. There used to be an operator for this: onErrorFlatMap
. But it was deprecated in issue 1465. You can read there for a more thorough explanation, but basically the logic is that once an Observable is in error state it should never emit anything again. It's consumed. This ties to the contract of only delivering onError once and only once. You may "restart" by reconnecting to the source, but "recovering" is forbidden.
So an Observable will not behave like a message bus: it will stop delivering messages once it has delivered an error. The error handling operators never recover the stream, but instead restarts it or switches to other streams once errors happen.
In your example, you say onErrorReturn(e -> MAX_VALUE)
, which will replace the stream with a stream only containing MAX_VALUE once an error occurs in the source stream, effectively ending with MAX_VALUE if an error is encountered. Then you say retry()
, which means that the stream should be restarted if an error occurs. AFAIK these two are conflicting.
As far as advice go, I can only recommend to treat your streams as volatile. There are some workarounds like materialize
mentioned in the ticket, but they way I usually deal with it is to create new streams. In http servers, for instance, I create one stream per request instead of one stream handling all requests. This way exceptions are isolated to single requests.
It would be nice to hear if benjchristensen has any new ideas since that issue was written.
Upvotes: 1