Konstantin Konopko
Konstantin Konopko

Reputation: 5422

Call two chained independent methods using Rx

I have two async methods, that got to be called while one operation. Each method could be completed successfully or retrieve with error. On case of error, I got to retry call each method once again, with delayed of 2 sec. Mean, I should call both methods, despite of result of one of them. In error callback I want to know in which method error occured, or in both methods.

It seems I should use Completable for this, but I'm absolutely newbie in Rx.

private void method1(final CompletableEmitter e, String path){
Database.getInstance().getReference(path).addListener(new Listener() {
            @Override
            public void onDataChange(Data data) {
               //todo something
               e.onComplete();                
            }
            @Override
            public void onCancelled(DatabaseError databaseError) {
               e.onError(new Throwable(databaseError.getMessage()));
            }
        });
}

Method2 is the same. The following code doesn't work properly.

    Completable completable1 = Completable.create(method1(e););
    Completable completable2 = Completable.create(method2(e););

    completable1
            .doOnError(…)
            .retry(1)
            .andThen(completable2 //never called if completable1 gets onError each time
                    .retry(1)
                    .doOnError(…))
            .subscribe(…).dispose();

Upvotes: 0

Views: 249

Answers (1)

crgarridos
crgarridos

Reputation: 9273

You have a lot of ways to do this. I'm going just to limit to explain how to achieve this using two Completables

Let's say you have two completables:

Completable doSomething = ...
Completable doSomethingElse = ...

To execute these sequentially, you can concatenate them using andThen operator. Then to delay a retry when an error occurs, you can use retryWhen:

doSomething.andThen(doSomethingElse)
    .retryWhen { Flowable.timer(2, TimeUnit.SECONDS) }
    .subscribe()

This snippet above will retry infinitely if an error is permanently occurring. To go beyond, you can limit the number of tries using:

.retryWhen { errors ->
    val retryCounter = AtomicInteger()
    errors.flatMap {
        if (retryCounter.getAndIncrement() <= 3)
            Flowable.timer(2, TimeUnit.SECONDS)
        else Flowable.error(it)
    }
}

If you want to retry only when a given type of error occurs, you can use:

.retryWhen { errors ->
    val retryCounter = AtomicInteger()
    errors.flatMap {
        if (it is YourSpecficError && retryCounter.getAndIncrement() <= 3)
            Flowable.timer(2, TimeUnit.SECONDS)
        else Flowable.error(it)
    }
}

In the case you want to retry each one independently, you can use:

doSomething.retryWhen { ... }
    .andThen(doSomethingElse.retryWhen { ... })
    .subscribe()

In addition, in order to avoid the retryWhen logic duplication, you could encapsulate this in an extension function:

fun Completable.retryDelayed(): Completable {
    return this.retryWhen { errors ->
        val retryCounter = AtomicInteger()
        errors.flatMap {
            if (it is YourSpecficError && retryCounter.getAndIncrement() <= 3)
                Flowable.timer(2, TimeUnit.SECONDS)
            else Flowable.error(it)
        }
    }
}

If you want to run your completables in parallel you ca use merge operator:

Completable doAll = Completable.merge(listOf(doSomething, doSomething))

Upvotes: 1

Related Questions