Reputation: 3406
I need advice on how to accomplish the following with RxJava. I need to do a series of chained calls, where if any of the calls has a response which contains a specific value, the subsequent calls must not happen, and that call's value must be returned.
I'm trying something like this:
public Observable<ReturnObject> doChainedApiCall() {
Observable<ReturnObject> obs = service.apiCall1()
.flatMap({
if (call1Response.hasValue) {
// don't do anymore api calls, return this
ReturnObject obj = new ReturnObject();
obj.setSomething();
return Observable.just(obj);
}
return service.apiCall2();
})
.flatMap({
if (call2Response.hasValue) {
// don't do anymore api calls, return this
ReturnObject obj = new ReturnObject();
obj.setSomething();
return Observable.just(obj);
}
return service.apiCall3();
})
.flatMap({
ReturnObject obj = new ReturnObject();
return Observable.just(obj);
});
return obs;
}
This code will not compile, it's just an idea of what I'm thinking of doing.
How can I make sure that the chain will end at the first call and return the value if the check inside it is true?
Upvotes: 1
Views: 1968
Reputation: 3406
I have found an answer to my own question.
Observable<ReturnObject> observable = Observable.concat(
apiCall1(),
apiCall2(),
apiCall3()
).flatMap(response -> {
if (response.value == whatWeAreLookingFor) {
ReturnObject obj = new ReturnObject();
// set obj properties and return it.
return Observable.just(obj);
}
// Return empty to continue with next call from .concat() list.
return null; // or return Observable.empty();
}).first(); // Makes sure only the first non-empty result is called and returned.
The .first()
can even be replaced with a .firstOrDefault(Something)
to make sure something comes back in the end, if none of the 3 api calls has what we're looking for.
Thanks for the other suggestions. I think this way is simplest, if you know a simpler way to achieve the same, I'm interested to know.
Upvotes: 2
Reputation: 3083
The switch is the right track but you'll have to write your own custom operator that switches if a condition is met as there isn't an existing one. I modified OperatorSwitchIfEmpty
operator from the rxjava github repository to do this
import rx.*;
import rx.functions.Func1;
import rx.internal.producers.ProducerArbiter;
import rx.subscriptions.SerialSubscription;
public final class OperatorSwitchIfMatch<T> implements Observable.Operator<T, T> {
private final Observable<? extends T> alternate;
private final Func1<T, Boolean> selector ;
public OperatorSwitchIfMatch(Observable<? extends T> alternate, Func1<T, Boolean> selector) {
this.alternate = alternate;
this.selector = selector;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final SerialSubscription ssub = new SerialSubscription();
ProducerArbiter arbiter = new ProducerArbiter();
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, ssub, arbiter, alternate,selector);
ssub.set(parent);
child.add(ssub);
child.setProducer(arbiter);
return parent;
}
private static final class ParentSubscriber<T> extends Subscriber<T> {
private boolean matched = true;
private final Subscriber<? super T> child;
private final SerialSubscription ssub;
private final ProducerArbiter arbiter;
private final Observable<? extends T> alternate;
private final Func1<T, Boolean> selector ;
ParentSubscriber(Subscriber<? super T> child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable<? extends T> alternate, Func1<T, Boolean> selector) {
this.child = child;
this.ssub = ssub;
this.arbiter = arbiter;
this.alternate = alternate;
this.selector = selector;
}
@Override
public void setProducer(final Producer producer) {
arbiter.setProducer(producer);
}
@Override
public void onCompleted() {
child.onCompleted();
}
private void subscribeToAlternate() {
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
ssub.set(as);
alternate.unsafeSubscribe(as);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
if (selector.call(t)) {
if (!child.isUnsubscribed()) {
subscribeToAlternate();
}
} else {
child.onNext(t);
}
arbiter.produced(1);
}
}
private static final class AlternateSubscriber<T> extends Subscriber<T> {
private final ProducerArbiter arbiter;
private final Subscriber<? super T> child;
AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) {
this.child = child;
this.arbiter = arbiter;
}
@Override
public void setProducer(final Producer producer) {
arbiter.setProducer(producer);
}
@Override
public void onCompleted() {
child.onCompleted();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onNext(T t) {
child.onNext(t);
arbiter.produced(1);
}
}
}
You can then use it like
import rx.Observable;
public class SwitchMatchTest {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> stream1 = Observable.just(9,1,1,1,1,9,1); // Will be skipped because it starts with 9
Observable<Integer> stream2 = Observable.just(9,2,2,2,9,11); // Will be skipped because stream2 starts with 9
Observable<Integer> stream3 = Observable.just(9,3,3,9,11);
Observable<Integer> stest = stream1.lift(new OperatorSwitchIfMatch<Integer>(stream2, x -> x>6)).lift(new OperatorSwitchIfMatch<Integer>(stream3, x -> x>6));
stest.subscribe(l -> System.out.println("Emitted : " + l), e -> {}, () -> System.out.println("Completed"));
Thread.sleep(2000L);
}
}
In your case you'd use something like
service.apiCall1().lift(new OperatorSwitchIfMatch<Response<AnObject>>(service.apiCall2(), x -> !x.hasValue))
.lift(new OperatorSwitchIfMatch<Response<AnObject>>(service.apiCall3(), x -> !x.hasValue))
.lift(new OperatorSwitchIfMatch<Response<AnObject>>(somedefaultObservable, x -> !x.hasValue))
Upvotes: 1
Reputation: 869
Assuming your apiCallX
methods all return an Observable
and if they all complete properly with an onCompleted
event, you can use the switchIfEmpty
and/or defaultIfEmpty
operators. Something like this:
service.apiCall1()
.switchIfEmpty(service.apiCall2())
.switchIfEmpty(service.apiCall3())
.defaultIfEmpty(/*maybe some default some value?*/)
Btw. all operators are nicely listed (most including marble diagrams) in the javadoc.
Upvotes: 0