shaunmwa
shaunmwa

Reputation: 136

RxJS design pattern for multiple async calls

I am new to RxJS and haven't been able to find clear answers on the following use case:

In a mobile app (Angular/Ionic), I need to (1) make simultaneous HTTP calls and only return data when all have completed (like $q.all). I want to (2) throw an error if the calls work correctly but there is a nested value in one of the responses that meets a certain criteria (ie user is not authenticated properly). Because it's a mobile app, I want to (3) build in a few retry attempts if the calls don't work correctly (for any reason). If after a certain number of retry attempts the calls still fail, I want to (4) throw an error.

Based on my research seems like forkJoin works the same as q.all. I have a provider that returns the following (observableArray holds the http calls).

return Observable.forkJoin(observableArray)

And then I can pipe in some operators, which is where I'm starting to struggle. For checking the nested value (2), I am using an underscore method to iterate over each response in my response array. This doesn't seem clean at all.

For retrying the calls (3), I am using retryWhen and delayWhen. But I am unsure how to limit this to 3 or 4 attempts.

And if the limit is hit, how would I throw an error back to the subscribers (4)?

.pipe(
    map(
        res => {
            _.each(res, (obs) => {
                if (!obs['success']) throw new Error('success was false')
            })
        }
    ),
    retryWhen(attempts =>
        attempts.pipe(
            tap(err => console.log('error:', err),
                delayWhen(() => timer(1000))
            )
        )
    ))

Upvotes: 0

Views: 2196

Answers (2)

Picci
Picci

Reputation: 17762

A possibility to consider is to pipe the various operators you need, e.g. retry and map, into each Observable contained in the observableArray you pass to forkJoin.

The code could look like something similar to this

const observableArray = new Array<Observable<any>>();
const maxRetries = 4;

function pipeHttpObservable(httpObs: Observable<any>): Observable<any> {
   return httpObs
      .pipe(
         map(data => data.success ? data : throwError('success was false')),
         retryWhen(err => err.delay(1000).take(maxRetries))
      )
}

observableArray.push(pipeHttpObservable(httpObs1));
observableArray.push(pipeHttpObservable(httpObs2));
.....
observableArray.push(pipeHttpObservable(httpObsN));

forkJoin(observableArray).subscribe(result => do stuff with the results)

Upvotes: 0

CozyAzure
CozyAzure

Reputation: 8478

There are couple of tricks here to make your code clean.

1. Use Observable.iif():

iif accepts a condition function and two Observables. When an Observable returned by the operator is subscribed, condition function will be called. Based on what boolean it returns at that moment, consumer will subscribe either to the first Observable (if condition was true) or to the second (if condition was false).

2. Use JavaScript array native's every():

The every() method tests whether all elements in the array pass the test implemented by the provided function.

3. Use take() to terminate your retryWhen

Emits only the first count values emitted by the source Observable.

So your code boils down to:

.pipe(
    switchMap((res) => iif(
        () => res.every(({success}) => success),
        res,//if every element in res is successful, return it
        throwError('success was false') //else if any of it is false, throw error.
        )
    ),
    retryWhen(err => err.delay(1000).take(4))
)

Edit:

If you want to catch the error at your subscribe, you will need to rethrow the error. .take() will actually just terminate the sequence, aka completing it:

.pipe(
    switchMap((res) => iif(
        () => res.every(({success}) => success),
        res,//if every element in res is successful, return it
        throwError('success was false') //else if any of it is false, throw error.
        )
    ),
    retryWhen(err => {
        return errors.scan((errorCount, err) =>
            (errorCount >= 4 ? //is retried more than 4 times?
                    throwError(err) : //if yes, throw the error
                    errorCount++ //else just increment the count
            ), 0);
    })
)

Upvotes: 3

Related Questions