nil
nil

Reputation: 549

How to retry with delay an Observable inside pipe in RxJS?

I have the following code:

notificationsWsSubject.pipe(
    filter((socket): socket is Socket => !!socket),
    switchMap(socket => fromEvent<Socket.DisconnectReason>(socket, 'disconnect')),
    tap(() => wsConnectedSubject.next(false)),
    filter(reason => (['ping timeout', 'transport close', 'transport error'] as Socket.DisconnectReason[]).includes(reason)),
    switchMap(() => signedInObservable),
    switchMap(user => forkJoin([of(user), from(getNotificationsWebsocketTicket())])),
).subscribe(values => {
    // Connect with websocket
}, error  => {
    // Throw error to user
})

The general flow:

My problem is that I would like to retry the from(getNotificationsWebsocketTicket()) in case it fails, with a delay of 5 seconds between each failure.
Only after 3 retries I want the entire main observable to fail.

something like:

from(getNotificationsWebsocketTicket()).pipe(delayedRetry(3, 5000))

Is that possible?

Upvotes: 1

Views: 1370

Answers (1)

Mrk Sef
Mrk Sef

Reputation: 8022

Yeah, you can do this:

from(getNotificationsWebsocketTicket()).pipe(
  retryWhen(e$ => e$.pipe(
    take(3),
    delay(5000)
  ))
);

retryWhen can be a bit of an interesting operator to get your head around. e$ is an observable that is managed by the retryWhen operator. Anytime the source errors, retryWhen emits that error to e$.

What happens next is up to the observable your lambda returns:

Errors are errors,
Completes are completes,
BUT emissions (next) are a cue to retry

Upvotes: 1

Related Questions