Reputation: 952
I'm trying to create an eventbus with rxjava on which I throw some commands that will continue even when errors happen. I was looking into onErrorFlatMap but that does not exist anymore and I can't get my head around materialize and dematerialize yet.
This is what I already have:
My little test
public class EventBusTest {
private EventBus eventBus = new EventBus();
@Test
public void test() {
//listeners in service layer
eventBus.on(createOf(BeanA.class))
.subscribe(this::handleBeanAInService1);
eventBus.on(createOf(BeanA.class))
.doOnNext(BeanA::validateBusinessLogic)
.subscribe(this::handleBeanAInService2);
eventBus.on(createOf(BeanB.class))
.subscribe(this::handleBeanBInService);
//incoming requests in rest layer
List<String> incomingCalls = new ArrayList<>();
Collections.addAll(incomingCalls, "firstRequest", "secondRequestErrorOnValidate", "thirdRequest", "fourthRequestErrorInService", "fifthRequest");
incomingCalls.stream().forEach(this::incomingCallsEgHttpCalls);
}
public void incomingCallsEgHttpCalls(String string) {
Observable.just(string)
.map(aName -> new BeanA(aName))
.doOnNext(bean -> eventBus.post(new CreateCommand(bean)))
.subscribe(bean -> System.out.println("\tReturn ok to client: " + bean.getName()), error -> System.out.println("\tReturning error to client: " + string + "; " + error.getMessage()));
}
public void handleBeanAInService1(BeanA beanA) {
System.out.println("BeanAService1 handling BeanA " + beanA.getName());
if(beanA.getName().contains("ErrorInService")) {
throw new RuntimeException("service exception for " + beanA.getName());
}
eventBus.post(new CreateCommand(new BeanB(beanA.getName())));
}
public void handleBeanAInService2(BeanA beanA) {
System.out.println("BeanAService2 handling BeanA " + beanA.getName());
}
public void handleBeanBInService(BeanB beanB) {
System.out.println("BeanBService handling BeanB " + beanB.getName());
}
}
EventBus
@Named
public class EventBus {
private PublishSubject<Object> publishSubject = PublishSubject.create();
public EventBus post(Object object) {
//publishSubject.onNext(object); => does not work, OnErrorNotImplementedExceptions if eventbus observers throw errors in validate
//To prevent OnErrorNotImplementedException
try {
publishSubject.onNext(object);
} catch (OnErrorNotImplementedException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
return this;
}
public Observable<Object> observe() {
//I also tried
//return publishSubject.onErrorResumeNext(publishSubject);
return publishSubject;
}
public <F, T> Observable<T> on(Observable.Transformer<F, T> onCommand) {
return onCommand.call((Observable<F>) publishSubject);
}
}
CreateCommand
public class CreateCommand {
private Object object;
public CreateCommand(Object object) {
this.object = object;
}
public Class<?> type() {
return object.getClass();
}
public <T> T value() {
return (T) object;
}
public static <F, T> Observable.Transformer<F, T> createOf(Class<T> clazz) {
return observable -> observable
.ofType(CreateCommand.class)
.filter(createCommand -> clazz.isInstance(createCommand.object))
.map(createCommand -> clazz.cast(createCommand.object));
}
}
BeanA
public class BeanA {
private String name;
public BeanA(String name) {
this.name = name;
}
public String getName() {
return name;
}
public static void validateBusinessLogic(BeanA beanA) {
if (beanA.getName().contains("ErrorOnValidate")) {
throw new RuntimeException("validate exception for " + beanA.getName());
}
}
}
BeanB
public class BeanB {
private String name;
public BeanB(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
Actual output
BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
Return ok to client: fifthRequest
Wanted output
BeanAService1 handling BeanA firstRequest
BeanBService handling BeanB firstRequest
BeanAService2 handling BeanA firstRequest
Return ok to client: firstRequest
BeanAService1 handling BeanA secondRequestErrorOnValidate
BeanBService handling BeanB secondRequestErrorOnValidate
Returning error to client: secondRequestErrorOnValidate; validate exception for secondRequestErrorOnValidate
BeanAService1 handling BeanA thirdRequest
BeanBService handling BeanB thirdRequest
BeanAService2 handling BeanA thirdRequest
Return ok to client: thirdRequest
BeanAService1 handling BeanA fourthRequestErrorInService
BeanAService2 handling BeanA fourthRequestErrorInService
Returning error to client: fourthRequestErrorInService; service exception for fourthRequestErrorInService
BeanAService1 handling BeanA fifthRequest
BeanBService handling BeanB fifthRequest
BeanAService2 handling BeanA fifthRequest
Return ok to client: fifthRequest
Any ideas how to solve it? I saw there was an issue in rxjava regarding examples for EventBus implementations but did not find it.
I'd also like that my services do not need to do some specifix rxjava error handling.
Or is there another way of pushing events to multiple other observables and then zipping the results again?
Upvotes: 1
Views: 968
Reputation: 69997
Errors in RxJava are terminal events which tear down the observable streams (unless captured/resumed by onErrorXXX and onException operators). If you want to keep the streams alive, you need to wrap your error into some other value type (i.e., Notification) and unwrap it at the relevant locations.
Here is an example gist that shows an eventbus implementation over RxJava constructs. Note the following things:
Upvotes: 1