Benjamin K
Benjamin K

Reputation: 296

How to execute many RxJava2 flux in a row

I'm introducing myself about RxJava2, but i feel like i'm doing something wrong. In my case, i want to do some following asynchronous actions.

In this example, the first action is to check if the device is connected (wifi or data, let's admit it take time), then i want to connect to an api and then i want to do a http call for get a list (observable) and then work with it. If one of those operation fail, an onError or exception should be raised and handled in the subscribe.

I have this code who works:

Single.create((SingleEmitter<Boolean> e) -> e.onSuccess(Connectivity.isDeviceConnected(MainActivity.this)) )
    .subscribeOn(Schedulers.io())
    .flatMap(isDeviceConnected -> {
        Log.i("LOG", "isDeviceConnected : "+ isDeviceConnected);
        if(!isDeviceConnected)
            throw new Exception("whatever"); // TODO : Chercher vrai erreur

        return awRepository.getFluxAuthenticate(host, port, user, password); // Single<DisfeApiAirWatch>
    })
    .toObservable()
    .flatMap(awRepository::getFluxManagedApps)  // List of apps : Observable<AirwatchApp>

    .observeOn(AndroidSchedulers.mainThread())
    .doFinally(this::hideProgressDialog)
    .subscribe(
            app -> Log.i("LOG", "OnNext : "+ app),
            error -> Log.i("LOG", "Error : " + error),
            () -> Log.i("LOG", "Complete : ")
);

But do a single who emmit a boolean for a simple "if" sounds wrong. A Completable seems more logical (work or not, continue or stop). I tried with the following code but it's not working.

Completable.create((CompletableEmitter e) -> {
    if(Connectivity.isDeviceConnected(MainActivity.this))
        e.onComplete(); // Guess not good, should call the complete of subscribe ?
    else
        e.onError(new Exception("whatever"));
} ).toObservable()
    .subscribeOn(Schedulers.io())
    .flatMap(awRepository.getFluxAuthenticate(host, port, user, password)) //Single<DisfeApiAirWatch>
    .toObservable()
    .flatMap(awRepository::getFluxManagedApps) // List of apps : Observable<AirwatchApp>

    .observeOn(AndroidSchedulers.mainThread())
    .doFinally(this::hideProgressDialog)
    .subscribe(
            app -> Log.i("LOG", "OnNext : "+ app),
            error -> Log.i("LOG", "Error : " + error),
            () -> Log.i("LOG", "Complete : ")
);

How to make this code work ?

I know i can do a first subscribe on the complatable and in the "onSuccess" of this one write another flux / the rest of the code. But i don't think stack flows inside each other is a good solution.

Best regards

Upvotes: 1

Views: 73

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Completable has no value so flatMap will never be invoked. You have to use andThen and make the authentication success value the input for the subsequent flatMap:

Completable.create((CompletableEmitter e) -> {
    if(Connectivity.isDeviceConnected(MainActivity.this))
        e.onComplete();
    else
       e.onError(new Exception("whatever"));
})
.subscribeOn(Schedulers.io())
.andThen(awRepository.getFluxAuthenticate(host, port, user, password)) // <-----------
.flatMapObservable(awRepository::getFluxManagedApps)
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::hideProgressDialog)
.subscribe(
        app -> Log.i("LOG", "OnNext : "+ app),
        error -> Log.i("LOG", "Error : " + error),
        () -> Log.i("LOG", "Complete : ")

);

Upvotes: 1

Related Questions