Thomas Keller
Thomas Keller

Reputation: 6060

RxJava flow: conditional operators and error handling

I'm a newbie to RxJava and try to wrap my head around a more complex login logic that has three async methods to process. For me this is "if I get this thing converted to RxJava, anything (tm) is possible" :)

So what I want to do is the following:

Call A -> (Process A) -> Call B with results of A -> (Process B) -\
                    \                                              -> Combine and Subscribe
                     \-> Call C with results of A -> (Process C) -/

Now the problem is that the Call C branch should only be executed on a specific condition, not otherwise (Combine and Subscribe could then receive a NULL value from that branch, this is ok).

Also, the error handling is non-trivial: While Call A and Call C (if this runs at all) need to deliver their errors via onError to the final subscriber, Call B's "success" is rather optional and can be neglected in case of failure.

This is what I came up with so far, which ignores the "C" branch still:

 mApi.callA(someArgs)
            // a transition operator to apply thread schedulers
            .compose(applySchedulers())
            // from rxlifecycle-components, unsubscribes automatically 
            // when the activity goes down  
            .compose(bindToLifecycle())
            // possibly other transformations that should work on the (error)
            // states of the first and the following, chained API calls
            .flatMap(response -> processA(response))
            .flatMap(response -> mApi.callB(response.token))
            .flatMap(response -> processB(response))
            // redirects HTTP responses >= 300 to onError()
            .lift(new SequenceOperators.HttpErrorOperator<>())
            // checks for application-specific error payload and redirects that to onError()
            .lift(new SequenceOperators.ApiErrorOperator<>())
            .subscribe(this::allDone this::failure);

I looked around at the Wiki for conditional operators, but I cannot find a hint how to kick-off the Call C branch.

Also, I'm unsure if my SequenceOperators work out this way, i.e. can be put after all requests in the chain, or whether I need several of those, each placed intermediately after a flatMap() operator that triggers a new Call.

Could anybody point me into the right direction?

Thanks!

Upvotes: 3

Views: 1623

Answers (1)

Andrea Baccega
Andrea Baccega

Reputation: 27623

You should use the Zip operator :) the result should look like this:

mApi.callA(someArgs)
        // ...
        .flatMap(response -> processA(response))
        .flatMap(response -> {
              return Observable.zip(
                    callB(response),
                    callC(response),
                    (rA,rB) -> {
                          // or just return a new Pair<>(rA, rB)
                          return combineAPlusB(rA,rB)
                    }
              )
        })
        // ...
        .subscribe(this::allDone this::failure);

Upvotes: 5

Related Questions