Reputation: 4011
I'm working on a project that involves Hystrix, and I decided to use RxJava. Now, forget Hystrix for the rest of this because I believe the main problem is with my complete screwing up of writing the Observable code correctly.
Need: I need a way to return an observable that represents a number of observables, each running a user task. I want that Observable to be able to return all results from the tasks, even errors.
Problem: Observable streams die on errors. If I have three tasks and the second task throws an exception, I never receive the third task even if it would have succeeded.
My Code:
public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
return Observable
.from(tasks)
.flatMap(task -> {
try {
return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
} catch(Exception ex) {
return Observable.error(ex);
}
});
}
Given that MyCommand is a class that extends HystrixObservableCommand, it returns an Observable and so shouldn't figure in on the problems I'm seeing.
Attempt 1:
Used Observable.flatMap
as above
Attempt 2:
Used Observable.concatMapDelayError
instead of flatMap
Any help will be greatly appreciated and probably result in me being very embarrassed for not having thought of it myself.
Additional Code
This test succeeds with Observable.flatMap
, but fails when using Observable.concatMapDelayError
because the tasks do not run asynchronously:
java.lang.AssertionError: Execution time ran over the 350ms limit: 608
@Test
public void shouldRunManagedAsyncTasksConcurrently() throws Exception {
Observable<String> testObserver = executor.observeManagedAsync("asyncThreadPool",getTimedTasks());
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
long startTime = System.currentTimeMillis();
testObserver.doOnError(throwable -> {
System.out.println("error: " + throwable.getMessage());
}).subscribe(testSubscriber);
System.out.println("Test execution time: "+(System.currentTimeMillis()-startTime));
testSubscriber.awaitTerminalEvent();
long execTime = (System.currentTimeMillis()-startTime);
System.out.println("Test execution time: "+execTime);
testSubscriber.assertCompleted();
System.out.println("Errors: "+testSubscriber.getOnErrorEvents());
System.out.println("Results: "+testSubscriber.getOnNextEvents());
testSubscriber.assertNoErrors();
assertTrue("Execution time ran under the 300ms limit: "+execTime,execTime>=300);
assertTrue("Execution time ran over the 350ms limit: "+execTime,execTime<=350);
testSubscriber.assertValueCount(3);
assertThat(testSubscriber.getOnNextEvents(),containsInAnyOrder("hello","wait","world"));
verify(this.mockSchedulerFactory, times(3)).get("asyncThreadPool");
}
Tasks for the above unit test:
protected List<EspTask<String>> getTimedTasks() {
EspTask longTask = new EspTask("helloTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(100);
return "hello";
}
};
EspTask longerTask = new EspTask("waitTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(150);
return "wait";
}
};
EspTask longestTask = new EspTask("worldTask") {
@Override
public Object doCall() throws Exception {
Thread.currentThread().sleep(300);
return "world";
}
};
return Arrays.asList(longTask, longerTask, longestTask);
}
Upvotes: 0
Views: 2818
Reputation: 12087
Use .materialize()
to allow all emissions and errors to come through as wrapped notifications then deal with them as you wish:
.flatMap(task -> {
try {
return new MyCommand(task.getTaskId(),groupName,task)
.toObservable()
.subscribeOn(this.schedulerFactory.get(groupName))
.materialize();
} catch(Exception ex) {
return Observable.error(ex).materialize();
}
});
Upvotes: 1
Reputation: 16142
You want to use mergeDelayError
:
public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
return Observable.mergeDelayError(Observable
.from(tasks)
.map(task -> {
try {
return new MyCommand(task.getTaskId(),groupName,task).toObservable().subscribeOn(this.schedulerFactory.get(groupName));
} catch(Exception ex) {
return Observable.error(ex);
}
}));
}
Note that your MyCommand constructor should not throw any exceptions; this allows your code to be written more concisely:
public <T> Observable<T> observeManagedAsync(String groupName,List<EspTask<T>> tasks) {
return from(tasks)
.map(task -> new MyCommand(task.getTaskId(), groupName, task)
.toObservable()
.subscribeOn(this.schedulerFactory.get(groupName)))
.compose(Observable::mergeDelayError);
}
Keep in mind that this will still invoke onError
at most once; if you need explicit handling of all errors, use something like an Either<CommandResult, Throwable>
as the return type (or handle the errors and return an empty observable).
Upvotes: 1
Reputation: 1960
You can use Observable.onErrorReturn()
, and return special value (e.g. null), then filter non-special values downstream. Keep in mind that source observable will complete on error. Also depending on use case Observable.onErrorResumeNext()
methods can be useful aswell. If you are interested in error notifications, use Observable.materialize()
, this will convert items and onError()
, onComplete()
into Notifications, which then can be filtered by Notification.getKind()
Edit.
All operators mentioned above should be added right after .toObservable().subscribeOn(this.schedulerFactory.get(groupName));
assuming try/catch was absent.
Upvotes: 1