Reputation: 385
The problem:
I have a list of urls. I have an Observable method that uses a url to grab a file, download it, and store it locally. I would like to launch these requests in parallel, but only allow 4 threads at once (I generate pdfs server side and want to reduce the load). In addition, I need to return from this download step only once all the url locations have been downloaded.
Current solution
Right now, I just launch the requests all at once and use forkJoin. After searching for a couple days, I have come across a few solutions on here that have given me some ideas, but they don't do exactly what I want. My main source is here.
export function limitedParallelObservableExecution<T>(listOfItems: Array<T>, observableMethod: (item: T) => Observable<any>): Observable<any> {
const MAX_CONCURRENCY = 4;
if (listOfItems && listOfItems.length > 0) {
let observableListOfItems: Observable<Observable<any>> = Observable.from(listOfItems).map(
(item: T) => observableMethod(item)
);
return observableListOfItems.merge(MAX_CONCURRENCY);
} else {
return Observable.of({});
}
}
I have another download step that is flatMapped to execute once this step completes. However, instead of just executing once, the next step executes once for every url in the list (as I understand it, this is because it emits once for each url that completes).
How do I maintain this concurrency while only returning once when all of my downloads have completed?
In addition, this seems to still launch all of my requests at once. Is there a better way to limit the number of simultaneous requests? Like, launch n requests in parallel, but only launch n + 1 once once of the first n have completed?
Extra code examples
Here is a code snippet showing how I launch each download step only once the previous once has completed:
).flatMap(
(uploadFlightActualsSuccess) => {
this.changeProgressValue(this.FLIGHT_ACTUALS_UPLOAD_END);
return this.syncDocuments();
}
).flatMap(
(syncDocumentsSuccess) => {
this.changeProgressValue(this.OPERATOR_DOCUMENT_DOWNLOAD_END);
return this.syncTripDocuments()
},
(error) => error
).flatMap(
(syncTripDocumentsSuccess) => {
this.changeProgressValue(this.TRIP_DOCUMENT_DOWNLOAD_END);
return this.expenseItemSyncProvider.syncPortalData();
}
).flatMap(
(expenseItemSyncSuccess) => {
return this.flightPersonnelSyncProvider.syncFlightPersonnelByTrip();
}
).flatMap(
'syncTripDocuments' is the request that downloads the list of urls. I only want to the next step once all those complete.
Upvotes: 2
Views: 1719
Reputation: 385
The issue with the posted solutions (while giving me the concurrency flexibility) was that they didn't satisfy the condition requiring the entire action to only emit once, once every item had completed.
The working solution is as follows:
import { toArray, mergeMap } from "rxjs/operators";
import { of, from, Observable } from "rxjs";
export function limitedParallelObservableExecution<T>(
listOfItems: Array<T>,
observableMethod: (item: T) => Observable<any>,
maxConcurrency: number = 4
): Observable<any> {
if (listOfItems && listOfItems.length > 0) {
let observableListOfItems: Observable<T> = from(listOfItems);
return observableListOfItems.pipe(
mergeMap(observableMethod, maxConcurrency),
toArray()
);
} else {
return of({});
}
}
The strategy here is to:
1) Create an observable stream from the list of items
2) Pass the observable method into mergeMap, along with maxConcurrency
3) Use toArray() to ensure all the observables complete before returning
Upvotes: 1
Reputation: 32639
Here's one way to do it, use the zip
operator to throttle the requests, like this.
Start with two streams, the first is the sequence of URLs to download, the 2nd is a sequence of 4 objects, so something like this:
s1$ = Observable.from(list_of_urls);
s2$ = new ReplaySubject();
for(let i = 0; i < 4 ; i++) s2$.next(i);
Then zip these two together, and mergeMap
to download the files. Once each download completes, emit a new event at s2$
so that it can continue, something like this:
s3$ = s1$.pipe(
zip(s2$),
mergeMap(([a, b]) => download_url(a).pipe(tap(c => s2$.next(c)))))
So now, every time a file finishes downloading a new element is emitted on s2$
allowing the next zipped pair to be processed.
EDIT
Alternatively we can use a simple Subject
instead of a ReplaySubject
and move the for
loop emitting the first four values till after the subscription to s3$
:
s1$ = Observable.from(list_of_urls);
s2$ = new Subject();
s3$ = s1$.pipe(
zip(s2$),
mergeMap(([a, b]) => download_url(a).pipe(tap(c => s2$.next(c)))))
s3$.subscribe(...);
for(let i = 0; i < 4 ; i++) s2$.next(i);
EDIT 2
Instead of a for
loop to create the first 4 elements, we can zip with from([1,2,3,4]).pipe(concat(s2$))
instead of simply with s2$
I haven't run any of this, but you get the general idea.
Upvotes: 1