mradzinski
mradzinski

Reputation: 624

Chaining two Observables to return another

I have two observables named A<'ModelA'> and B<'ModelB'>. Each of them performs a request to a different REST service, so they extend a different model like expressed above. The request performed by any of them may fail. Now, I need to be able to chain both of them and return a ModelC object. So, the pseudo-coded stream would be something like this:

A<'ModelA'> performs the request, if it fails does something, if not then passes its result (responseModelA) to B<'ModelB'> so it can perform another REST request which involves using part of responseModelA. If B fails something happens, if not then combines its response (responseModelB) together with responseModelA (manually, setting the POJO fields) to create ModelC which is what the subscriber should recieve as a parameter on it's call() method.

Is this remotely possible to code using rxJava? I'm quite stucked into this so I'm open to any sugestions.

Thanks.

Upvotes: 2

Views: 1947

Answers (2)

kjones
kjones

Reputation: 5823

This assumes you have created a REST request interface that returns Observables similar to the following (using Retrofit this is quite easy):

interface RestApi {
    Observable<ModelA> getModelA();
    Observable<ModelB> getModelB(int modelBId);
}

class ModelA {
    int modelBId;
    Object fieldA;
}

class ModelB {
    Object fieldB;
}

class ModelC {
    Object fieldFromA;
    Object fieldFromB;

    public ModelC(Object fieldFromA, Object fieldFromB) {
        this.fieldFromA = fieldFromA;
        this.fieldFromB = fieldFromB;
    }
}

To make the ModelB request depend on the result of the ModelA request, you can use .flatMap to transform the results of one Observable into another Observable.

Then, to create ModelC, use .map to pick the desired fields from ModelA and ModelB and return the result.

RestApi restApi;

Observable<ModelC> observeModelC() {
    return restApi
            .getModelA()
            .flatMap(new Func1<ModelA, Observable<ModelC>>() {
                @Override
                public Observable<ModelC> call(final ModelA modelA) {
                    // Use the modelBId from modelA to get ModelB.
                    return restApi
                            .getModelB(modelA.modelBId)
                            // Combine A & B to create C
                            .map(new Func1<ModelB, ModelC>() {
                                @Override
                                public ModelC call(ModelB modelB) {
                                    return new ModelC(modelA.fieldA, modelB.fieldB);
                                }
                            });
                }
            });
}

Your subscriber would look like this:

observeModelC()
        .subscribe(new Observer<ModelC>() {
            @Override
            public void onCompleted() {
                // All done.
            }

            @Override
            public void onError(Throwable e) {
                // All errors from any API request will end up here.
                // For Retrofit, cast e to RetrofitError for
                // detailed error info.
            }

            @Override
            public void onNext(ModelC modelC) {
                // Yeah! - Use modelC.
            }
        });

Upvotes: 5

Dave Sexton
Dave Sexton

Reputation: 2652

Sequential composition is achieved in Rx.NET by SelectMany (I think it's FlatMap in Rx.Java?). Rx.NET also applies fail-fast semantics, thus SelectMany may be all that you need.

(C#)

IObservable<ModelC> query = A().FlatMap(B, (a, b) => C(a, b));

query.Subscribe(success => DoSomethingSuccessful(), error => DoSomethingElse(error));

where A and B are functions that return observables, and C is a function that returns ModelC.

However, if you want to do something specific when either A or B fails, then a quick solution in Rx.NET is to use the Do operator.

IObservable<ModelC> query = A().Do(_ => {}, AFailed)
                               .FlatMap(B.Do(_ => {}, BFailed), (a, b) => C(a, b));

query.Subscribe(success => DoSomethingSuccessful(), UltimateFailure);

where AFailed and BFailed are void-returning methods that accept a single Exception parameter.

Sorry I don't know Java, but maybe this will lead you to the correct solution.

Upvotes: 1

Related Questions