Reputation: 2769
I'm trying to use RxJS to process a stream of items, and I would like for it to retry any failures, preferentially with a delay (and exponential backoff if possible), but I need to guarantee the ordering so I effectively want to block the stream, and I don't want to reprocess any items that were already processed
So I was trying to play with retryWhen
, following this example:
const { interval, timer } = Rx;
const { take, map, retryWhen, delayWhen, tap } = RxOperators;
const source = take(5)(interval(1000));
source.pipe(
map(val => {
if (val >= 3) {
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
delayWhen(val => timer(1000))
)
)
);
But the stream restarts at the beginning, it doesn't just retry the last one:
Is it possible to achieve what I want? I tried other operators from docs as well, no luck. Would it be kinda against RxJS philosophy somehow?
Upvotes: 0
Views: 450
Reputation: 6706
The retryWhen
should be moved to an inner Observable
to handle the failed values only and keep the main Observable
working.
Try something like the following:
// import { timer, interval, of } from 'rxjs';
// import { concatMap, delayWhen, map, retryWhen, take } from 'rxjs/operators';
const source = interval(1000).pipe(take(5));
source.pipe(
concatMap((value) =>
of(value).pipe(
map((val) => {
if (val >= 3) {
throw val;
}
return val;
}),
retryWhen((errors) => errors.pipe(delayWhen(() => timer(1000))))
)
)
);
Upvotes: 1