jesobremonte
jesobremonte

Reputation: 3406

RxJava chaining calls, ending and returing the first call conditionally

I need advice on how to accomplish the following with RxJava. I need to do a series of chained calls, where if any of the calls has a response which contains a specific value, the subsequent calls must not happen, and that call's value must be returned.

I'm trying something like this:

public Observable<ReturnObject> doChainedApiCall() {
    Observable<ReturnObject> obs = service.apiCall1()
        .flatMap({
            if (call1Response.hasValue) {
                // don't do anymore api calls, return this
                ReturnObject obj = new ReturnObject();
                obj.setSomething();
                return Observable.just(obj);
            }
            return service.apiCall2();
        })
        .flatMap({
            if (call2Response.hasValue) {
                // don't do anymore api calls, return this
                ReturnObject obj = new ReturnObject();
                obj.setSomething();
                return Observable.just(obj);
            }
            return service.apiCall3();
        })
        .flatMap({
            ReturnObject obj = new ReturnObject();
            return Observable.just(obj);
        });

     return obs;
}

This code will not compile, it's just an idea of what I'm thinking of doing.

How can I make sure that the chain will end at the first call and return the value if the check inside it is true?

Upvotes: 1

Views: 1968

Answers (3)

jesobremonte
jesobremonte

Reputation: 3406

I have found an answer to my own question.

Observable<ReturnObject> observable = Observable.concat(
    apiCall1(),
    apiCall2(),
    apiCall3()
).flatMap(response -> {
    if (response.value == whatWeAreLookingFor) {
        ReturnObject obj = new ReturnObject();
        // set obj properties and return it.
        return Observable.just(obj);
    }
    // Return empty to continue with next call from .concat() list.
    return null; // or return Observable.empty();
}).first(); // Makes sure only the first non-empty result is called and returned.

The .first() can even be replaced with a .firstOrDefault(Something) to make sure something comes back in the end, if none of the 3 api calls has what we're looking for.

Thanks for the other suggestions. I think this way is simplest, if you know a simpler way to achieve the same, I'm interested to know.

Upvotes: 2

JohnWowUs
JohnWowUs

Reputation: 3083

The switch is the right track but you'll have to write your own custom operator that switches if a condition is met as there isn't an existing one. I modified OperatorSwitchIfEmpty operator from the rxjava github repository to do this

import rx.*;
import rx.functions.Func1;
import rx.internal.producers.ProducerArbiter;
import rx.subscriptions.SerialSubscription;


public final class OperatorSwitchIfMatch<T> implements Observable.Operator<T, T> {
    private final Observable<? extends T> alternate;
    private final Func1<T, Boolean> selector ;
    public OperatorSwitchIfMatch(Observable<? extends T> alternate, Func1<T, Boolean> selector) {
        this.alternate = alternate;
        this.selector  = selector;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        final SerialSubscription ssub = new SerialSubscription();
        ProducerArbiter arbiter = new ProducerArbiter();
        final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, ssub, arbiter, alternate,selector);
        ssub.set(parent);
        child.add(ssub);
        child.setProducer(arbiter);
        return parent;
    }

    private static final class ParentSubscriber<T> extends Subscriber<T> {

        private boolean matched = true;
        private final Subscriber<? super T> child;
        private final SerialSubscription ssub;
        private final ProducerArbiter arbiter;
        private final Observable<? extends T> alternate;
        private final Func1<T, Boolean> selector ;

        ParentSubscriber(Subscriber<? super T> child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable<? extends T> alternate, Func1<T, Boolean> selector) {
            this.child = child;
            this.ssub = ssub;
            this.arbiter = arbiter;
            this.alternate = alternate;
            this.selector = selector;
        }

        @Override
        public void setProducer(final Producer producer) {
            arbiter.setProducer(producer);
        }

        @Override
        public void onCompleted() {            
                child.onCompleted();
        }

        private void subscribeToAlternate() {
            AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
            ssub.set(as);
            alternate.unsafeSubscribe(as);
        }

        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }

        @Override
        public void onNext(T t) {
            if (selector.call(t)) {
                if (!child.isUnsubscribed()) {
                    subscribeToAlternate();
                }
            } else {
                child.onNext(t);
            }
            arbiter.produced(1);
        }
    }

    private static final class AlternateSubscriber<T> extends Subscriber<T> {

        private final ProducerArbiter arbiter;
        private final Subscriber<? super T> child;

        AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) {
            this.child = child;
            this.arbiter = arbiter;
        }

        @Override
        public void setProducer(final Producer producer) {
            arbiter.setProducer(producer);
        }

        @Override
        public void onCompleted() {
            child.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }

        @Override
        public void onNext(T t) {
            child.onNext(t);
            arbiter.produced(1);
        }        
    }
}

You can then use it like

import rx.Observable;

public class SwitchMatchTest {


    public static void main(String[] args) throws InterruptedException {
        Observable<Integer> stream1 = Observable.just(9,1,1,1,1,9,1); // Will be skipped because it starts with 9
        Observable<Integer> stream2 = Observable.just(9,2,2,2,9,11); // Will be skipped because stream2 starts with 9
        Observable<Integer> stream3 = Observable.just(9,3,3,9,11);  
        Observable<Integer> stest = stream1.lift(new OperatorSwitchIfMatch<Integer>(stream2, x -> x>6)).lift(new OperatorSwitchIfMatch<Integer>(stream3, x -> x>6));

        stest.subscribe(l -> System.out.println("Emitted : " + l), e -> {}, () -> System.out.println("Completed"));
        Thread.sleep(2000L);
    }

}

In your case you'd use something like

service.apiCall1().lift(new OperatorSwitchIfMatch<Response<AnObject>>(service.apiCall2(), x -> !x.hasValue))
                  .lift(new OperatorSwitchIfMatch<Response<AnObject>>(service.apiCall3(), x -> !x.hasValue))
                  .lift(new OperatorSwitchIfMatch<Response<AnObject>>(somedefaultObservable, x -> !x.hasValue))

Upvotes: 1

RvanHeest
RvanHeest

Reputation: 869

Assuming your apiCallX methods all return an Observable and if they all complete properly with an onCompleted event, you can use the switchIfEmpty and/or defaultIfEmpty operators. Something like this:

service.apiCall1()
       .switchIfEmpty(service.apiCall2())
       .switchIfEmpty(service.apiCall3())
       .defaultIfEmpty(/*maybe some default some value?*/)

Btw. all operators are nicely listed (most including marble diagrams) in the javadoc.

Upvotes: 0

Related Questions