Reputation: 205
Using RxJS 5 i want to solve the following:
Lets say that Im fetching a list of categories of sorts from a REST API.
Based on each and one of those categories, I want to fetch sub categories from another REST endpoint.
Then, based on each and one of those sub categories, I want to fetch products and for each and one of those products we need to fetch the detailed description.
This I have solved. The problem is that the ajax calls escalate and under just a minute over 30k calls are made which results in bringing the server to its knees.
Now, since this is a nightly job Im okay with it taking some time as long as it completes successfully.
This is what I have:
getCategories() // Wraps ajax call and returns payload with array of categories
.switchMap(categories => Observable.from(categories))
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
})
)
.mergeMap(subCategories => Observable.from(subCategories))
.mergeMap(subCategory =>
getProducts(subCategory) // Wraps ajax call and returns payload with array of products
.catch(err => {
console.error('Error when fetching products for sub category:', subCategory);
console.error(err);
return Observable.empty();
})
)
.mergeMap(products => Observable.from(products))
.mergeMap(product =>
getProductDetails(product) // Wraps ajax call and returns payload with product details
.catch(err => {
console.error('Error when fetching product details for product:', product);
console.error(err);
return Observable.empty();
})
)
.mergeMap(productDetails => saveToDb(productDetails))
.catch(err => {
console.error(err);
})
.subscribe();
After the initial request where i fetch the categories, I want to:
Every call made to fetch sub categories should wait until the previous one is done. Only 5 ajax calls should be made at once when fetching products and those products details. After those 5 calls are done we trigger the next 5 calls etc.
Alternatively it could be controlled with time as in waiting x seconds before we do the next ajax call etc.
How would I go about to solve this in a nice with using RxJS based on my example above?
Upvotes: 1
Views: 748
Reputation: 9425
mergeMap
has an overload which takes an optional concurrency parameter. This you can utilize to control how many requests are being sent concurrently to your server.
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
}),
null,
5 /* concurrency */
)
Also; mergeMap will convert anything observableLike to an Observables so if your getSubCategories()
returns an Array that will automatically be converted to an observable, no further Observable.from()
is needed.
Upvotes: 3