Reputation: 1467
I have a use case where I need to aggregate the finished thread responses from multiple Observable objects and return back to the client. My question is how to achieve it with using the rX Java. Here I have written a code snippet but the issue of this one is that this won't return anything after the timeout.
Observable<AggregateResponse> aggregateResponse = Observable.
zip(callServiceA(endpoint), callServiceB(endpoint), callServiceC(endpoint),
(Mashup resultA, Mashup resultB, Mashup resultC) -> {
AggregateResponse result = new AggregateResponse();
result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
return result;
}).timeout(5, TimeUnit.SECONDS);
Subscriber
aggregateResponse.subscribe(new Subscriber<AggregateResponse>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
//Timeout execute this rather than aggregating the finished tasks
System.out.println(throwable.getMessage());
System.out.println(throwable.getClass());
}
@Override
public void onNext(AggregateResponse response) {
asyncResponse.resume(response);
}
});
Upvotes: 0
Views: 252
Reputation: 10267
You need to put the timeout
operator on each Observable
, zip
will wait for all Observables to emit a value before emitting a result, so if only one of them take longer while others already emitted, you will cut down the stream with the timeout
(with onError
) before the zipped Observable will have a chance to emit.
What you should do, assuming you want to ignore timed out sources while keeping the rest, is to add timeout operator to each Observable
and also add error handling like onErrorReturn
to each one, the error return can return some kind of 'empty' result (you can't use null in RxJava2), and when you aggregate result ignore those empty results:
Observable<AggregateResponse> aggregateResponse = Observable.
zip(callServiceA(endpoint)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> new Mashup()),
callServiceB(endpoint)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> new Mashup()),
callServiceC(endpoint)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> new Mashup()),
(Mashup resultA, Mashup resultB, Mashup resultC) -> {
AggregateResponse result = new AggregateResponse();
result.setResult(resultA.getName() + " " + resultB.getName() + " " + resultC.getName());
return result;
});
Upvotes: 1