ovg
ovg

Reputation: 1546

RxJS retry entire chain

I read images from a live stream and select a batch periodically. I then send them to the server for validation. A HTTP error will be thrown if any fail validation. If that occurs I want to get a new batch of images.

this.input.getImages()
  .throttleTime(500)
  .switchMap(image =>

      new Observable<{}>(observer => {
        // Some operation
      })
      .map(i => ({ image, i }))

  ).filter(({ i }) =>  {
    // some filtering
  })
  .map(({ image }) => image)
  .take(6)
  .bufferCount(6)
  .map(images =>  // switch map??
    Observable.fromPromise(this.server.validate(images))
  )
  .retry(2)  // This only retrys the request, I want it to retry the whole chain (to get valid images)
  .subscribe(images => {
      console.log('All done')
    },
    err => {console.log(err)}
  )

The problem I'm having is that only the HTTP request gets retried since that is the new observable. There must be some way to encapsulate the beginning of the chain into a single Observable?

Upvotes: 2

Views: 2191

Answers (2)

Andrey Nikolaev
Andrey Nikolaev

Reputation: 368

Simple way is to wrap your complex observable in defer ans use retry on resulting observable.

Upvotes: 1

Richard Matsen
Richard Matsen

Reputation: 23533

See learnrxjs - retry. The example shows everything restarting from source onwards when an error is thrown.

The page shows pipe syntax, but the JSBin shows fluid operator syntax if you prefer.

The basic pattern is

const retryMe = this.input.getImages()
  .flatMap(val => {
    Observable.of(val)
      // more operators
  })
  .retry(2);

Upvotes: 1

Related Questions