David Hackro
David Hackro

Reputation: 3712

execute multiple Subscribers rxjava

I have a problem using rxJava. I need to execute a UseCase twice but with different parameters and add response in each variable. The problem is that two variables have value before executing second UseCase but I don't know why.

Answer at the end of the question

Example execute first GetCompanies --> departureTrip (have a value response) but returnTrip have value but before the execute getCompaniesReturn

Declaration Variables

private GetCompanies getCompanies;
private GetCompanies getCompaniesReturn;
private SearchResponseDomain departureTrip;
private SearchResponseDomain returnTrip;

Execution

getCompanies.execute(new CompaniesObserver());
getCompaniesReturn.execute(new CompaniesTravelDoubleObserver());

GetCompanies

public class GetCompanies extends UseCase {

    private final AppRepository repository;
    private String origin;
    private String destination;
    private String date;
    private String passengers;

    @Inject
    public GetCompanies(AppRepository repository) {
        this.repository = repository;
    }

    @Override
    protected Observable buildObservableUseCase() {
        return this.repository.getAllResultsSearch(origin, destination, date, passengers);
    }
}

UseCase

public abstract class UseCase {


    private Subscription subscription = Subscriptions.empty();

    protected UseCase() {
    }

    @SuppressWarnings("unchecked")
    public void execute(Subscriber UseCaseSubscriber) {
        this.subscription = this.buildObservableUseCase()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(UseCaseSubscriber);
    }

    /**
     * Unsubscribes from current {@link rx.Subscription}.
     */
    public void unsubscribe() {
        if (!subscription.isUnsubscribed()) {
            subscription.unsubscribe();
        }
    }

    protected abstract Observable buildObservableUseCase();
}

Code execution getCompanies

  private final class CompaniesObserver extends 
  DefaultSubscriber<SearchResponseDomain> {
    @Override
    public void onCompleted() {
        combineResults(departureTrip,returnTrip);
    }

    @Override
    public void onError(Throwable e) {
        super.onError(e);
    }

    @Override
    public void onNext(SearchResponseDomain searchResponseDomain) {
        super.onNext(searchResponseDomain);
        departureTrip = searchMapper.reverseMap(searchResponseDomain);
    }
}

Code execution CompaniesTravelDoubleObserver

    private final class CompaniesTravelDoubleObserver extends 
    DefaultSubscriber<SearchResponseDomain> {
    @Override
    public void onCompleted() {
        super.onCompleted();
        combineResults(departureTrip,returnTrip);
    }

    @Override
    public void onError(Throwable e) {
        super.onError(e);
    }

    @Override
    public void onNext(SearchResponseDomain searchResponseDomain) {
        super.onNext(searchResponseDomain);
        returnTrip = searchMapper.reverseMap(searchResponseDomain);
    }
}

Method validation data completed

 public void combineResults(SearchResponsePresentation departureTrip,SearchResponsePresentation returnTrip){

        if(departureTrip != null && returnTrip != null){
            getView().hideLoading();
            getView().showCompanies(departureTrip,returnTrip );
        }
    }

Solution

after read and read post and discutions,i found a solution with help the user @Hans Wurst

1 - i have a problem,i don't have access a observable and i need subscribe in the presenter

GetCompanies

public class GetCompanies //extends UseCase {
{
    private final AppRepository repository;
    private String origin;
    private String destination;
    private String date;
    private String passengers;

    @Inject
    public GetCompanies(AppRepository repository) {
        this.repository = repository;
    }

    //@Override
    public Observable buildObservableUseCase() {
        return this.repository.getAllResultsSearch(origin, destination, date, passengers);
    }


    public void setOrigin(String origin) {
        this.origin = origin;
    }

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public void setPassengers(String passengers) {
        this.passengers = passengers;
    }
}

Presenter

  Observable<SearchResponseDomain> departureTrip  =    getCompanies.buildObservableUseCase().subscribeOn(Schedulers.io());
            Observable<SearchResponseDomain> returnTrip     =    getCompaniesReturn.buildObservableUseCase().subscribeOn(Schedulers.io());
            // Subs. to both observables. If one finishes -> whole stream closes
            Observable<List<SearchResponseDomain>> zip = Observable.zip(departureTrip, returnTrip, (d, r) -> {
                return Arrays.asList(d, r);
            });
            // Subscribe
            zip.observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new CompaniesTravelDoubleObserver());

   //------------------------------------------------------------//
private final class CompaniesTravelDoubleObserver extends 
DefaultSubscriber<List<SearchResponseDomain>> {
    @Override
    public void onCompleted() {
        getView().hideLoading();

    }
    @Override
    public void onError(Throwable e) {
        getView().hideLoading();
        networkError.setError(e);
        getView().hideLoading();
        getView().showError(networkError.getAppErrorMessage());
    }

    @Override
    public void onNext(List<SearchResponseDomain> searchResponseDomains) {
        getView().showCompanies(searchMapper.reverseMap(searchResponseDomains));

    }
}

Upvotes: 1

Views: 334

Answers (1)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

I will answer to the question: * There will be two different getCompany requests. If both requests are in (both return one value -> Single) the result must be shown in UI.

The zip operater combines each value from one stream with a value from the other stream.

Please note, that composing observables over operators should be done in most cases and there should be very little places, where you subscribe to.

@Test
public void name() throws Exception {
    Observable<SearchResponseDomain> departureTrip = getCompanies();
    Observable<SearchResponseDomain> returnTrip = getCompanies();

    // Subs. to both observables. If one finishes -> whole stream closes
    Observable<List<SearchResponseDomain>> zip = Observable.zip(departureTrip, returnTrip, (d, r) -> {
        return Arrays.asList(d, r);
    });

    // Subscribe
    zip.observeOn(AndroidSchedulers.mainThread())
            .subscribe(searchResponseDomain -> {
                // show stuff in UI
            });

}

// TODO: add params to getCompanies...
private Observable<SearchResponseDomain> getCompanies() {
    return Observable.just(new SearchResponseDomain());
}

class SearchResponseDomain {

}

I hope I captured your intent from your code above. If you want to have a clean UI-experience, please have a look at this presentation from Jake Wharton: https://speakerdeck.com/jakewharton/the-state-of-managing-state-with-rxjava-devoxx-us-2017

Upvotes: 1

Related Questions