rdehuyss
rdehuyss

Reputation: 952

RxJava Eventbus and error handling

I'm trying to create an eventbus with rxjava on which I throw some commands that will continue even when errors happen. I was looking into onErrorFlatMap but that does not exist anymore and I can't get my head around materialize and dematerialize yet.

This is what I already have:

My little test

public class EventBusTest {

    private EventBus eventBus = new EventBus();

    @Test
    public void test() {
        //listeners in service layer
        eventBus.on(createOf(BeanA.class))
                .subscribe(this::handleBeanAInService1);
        eventBus.on(createOf(BeanA.class))
                .doOnNext(BeanA::validateBusinessLogic)
                .subscribe(this::handleBeanAInService2);
        eventBus.on(createOf(BeanB.class))
                .subscribe(this::handleBeanBInService);

        //incoming requests in rest layer
        List<String> incomingCalls = new ArrayList<>();
        Collections.addAll(incomingCalls, "firstRequest", "secondRequestErrorOnValidate", "thirdRequest", "fourthRequestErrorInService", "fifthRequest");
        incomingCalls.stream().forEach(this::incomingCallsEgHttpCalls);
    }

    public void incomingCallsEgHttpCalls(String string) {
        Observable.just(string)
                .map(aName -> new BeanA(aName))
                .doOnNext(bean -> eventBus.post(new CreateCommand(bean)))
                .subscribe(bean -> System.out.println("\tReturn ok to client: " + bean.getName()), error -> System.out.println("\tReturning error to client: " + string + "; " + error.getMessage()));
    }

    public void handleBeanAInService1(BeanA beanA) {
        System.out.println("BeanAService1 handling BeanA " + beanA.getName());
        if(beanA.getName().contains("ErrorInService")) {
            throw new RuntimeException("service exception for " + beanA.getName());
        }
        eventBus.post(new CreateCommand(new BeanB(beanA.getName())));
    }

    public void handleBeanAInService2(BeanA beanA) {
        System.out.println("BeanAService2 handling BeanA " + beanA.getName());
    }

    public void handleBeanBInService(BeanB beanB) {
        System.out.println("BeanBService handling BeanB " + beanB.getName());
    }

}

EventBus

@Named
public class EventBus {

    private PublishSubject<Object> publishSubject = PublishSubject.create();

    public EventBus post(Object object) {
        //publishSubject.onNext(object); => does not work, OnErrorNotImplementedExceptions if eventbus observers throw errors in validate

        //To prevent OnErrorNotImplementedException
        try {
            publishSubject.onNext(object);
        } catch (OnErrorNotImplementedException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
            }
            throw new RuntimeException(cause);
        }
        return this;
    }

    public Observable<Object> observe() {
        //I also tried
        //return publishSubject.onErrorResumeNext(publishSubject);
        return publishSubject;
    }

    public <F, T> Observable<T> on(Observable.Transformer<F, T> onCommand) {
        return onCommand.call((Observable<F>) publishSubject);
    }

}

CreateCommand

public class CreateCommand {

    private Object object;

    public CreateCommand(Object object) {
        this.object = object;
    }

    public Class<?> type() {
        return object.getClass();
    }

    public <T> T value() {
        return (T) object;
    }

    public static <F, T> Observable.Transformer<F, T> createOf(Class<T> clazz) {
        return observable -> observable
                .ofType(CreateCommand.class)
                .filter(createCommand -> clazz.isInstance(createCommand.object))
                .map(createCommand -> clazz.cast(createCommand.object));
    }
}

BeanA

public class BeanA {
    private String name;

    public BeanA(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public static void validateBusinessLogic(BeanA beanA) {
        if (beanA.getName().contains("ErrorOnValidate")) {
            throw new RuntimeException("validate exception for " + beanA.getName());
        }
    }
}

BeanB

public class BeanB {

    private String name;

    public BeanB(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

}

Actual output

BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
    Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
    Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
    Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
    Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
    Return ok to client: fifthRequest

Wanted output

BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
    Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
    Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
BeanAService2 handling BeanA thirdRequest
    Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
BeanAService2 handling BeanA fourthRequestErrorInService
    Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
BeanAService1 handling BeanA fifthRequest
BeanBService handling BeanB fifthRequest
BeanAService2 handling BeanA fifthRequest
    Return ok to client: fifthRequest

Any ideas how to solve it? I saw there was an issue in rxjava regarding examples for EventBus implementations but did not find it.

I'd also like that my services do not need to do some specifix rxjava error handling.

Or is there another way of pushing events to multiple other observables and then zipping the results again?

Upvotes: 1

Views: 968

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Errors in RxJava are terminal events which tear down the observable streams (unless captured/resumed by onErrorXXX and onException operators). If you want to keep the streams alive, you need to wrap your error into some other value type (i.e., Notification) and unwrap it at the relevant locations.

Here is an example gist that shows an eventbus implementation over RxJava constructs. Note the following things:

  • You may need to serialize access to the subject in case the post() method of the event bus can be called from multiple threads.
  • RxJava sometimes does not bounce back exceptions thrown from onNext but only at the very beginning of the stream, terminating the whole stream so you need an operator which confines such errors below a certain level, hence the ClientErrorBounceBack operator in the example.

Upvotes: 1

Related Questions