Reputation: 1496
I have an observable that gets items from a lot of sources:
Source { List<Item> data }
Relationship between sources and items is many-to-many and in different sources items could duplicate themselves. Item is an entity that should be uploaded to server and server does not accept duplicates. To achieve this I merge Sources and distinct their Items by their ids and then upload unique items to server. Like below:
Observable.merge(source1(), source2(), source3())
.flatMapIterable(sources -> sources)
.flatMapIterable(source::getItems)
.distinct(item -> item.getId())
.flatMapCompletabale(item -> uploadItem(item))
Item uploading could emit several errors and on some of them I should retry to upload item once again later and proceed another items while 'failed' one is waiting for its retrying.
How can I postpone retrying uploading 'failed' item and proceed other items while this one is wating for its try?
Thanks in advance!
Upvotes: 2
Views: 4264
Reputation: 166
I had to modify the above example to create a Flowable to retryWhen a Single in my RxJava2 project:
import io.reactivex.Flowable; import io.reactivex.functions.Function;
import java.util.concurrent.TimeUnit;
public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Flowable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Flowable<?> apply(final Flowable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Flowable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Flowable.timer(retryDelay, timeUnit);
}
return Flowable.error(throwable);
});
} }
and to apply it to my single:
.retryWhen(new RetryWithDelay(5, 2, TimeUnit.SECONDS))
Upvotes: 0
Reputation: 1496
I put this function into retryWhen method and get it working.
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;
public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
this.maxRetryCount = maxRetryCount;
this.retryDelay = retryDelay;
this.timeUnit = timeUnit;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {
if (++retryCount < maxRetryCount) {
return Observable.timer(retryDelay, timeUnit);
}
return Observable.error(throwable);
});
}
}
Upvotes: 0
Reputation: 8227
To just handle the failure of one upload, you can add an operator in the final step:
.flatMapCompletable(item->uploadItem(item))
should become
.flatMapCompletable(item->uploadItem(item)
.retryWhen(throwable ->
throwable.delay(5, TimeUnit.SECONDS)))
Edit: I learned a lot about retryWhen()
operator from this Dan Lew blog entry. You will find several examples, including using the zip()
operator with Observable.range(3)
to limit the number of retries.
Upvotes: 4