Isaiah Fasoldt
Isaiah Fasoldt

Reputation: 190

How To Limit Concurrent API Requests In Angular Using RxJs

I have an Angular 7 application that allows you to multi-upload files. For each file chosen, it makes a request to the server. This isn't great because opening hundreds of concurrent calls seems like a potential problem for my backend servers.

What I'd like to do is limit the number of concurrent requests that can be made by my application. I have a generic API class that I'd like to use to do the limiting in app-wide beyond just the file uploading, instead of needing the file-upload component itself to have to manage it.

I'm admittedly confused at times by RxJx, but I'm pretty sure that this is possible.

class ApiService {

    get(path: string, params: any = {}): Observable<any> {
        return this.http.get(path`, { params: params });
    }
    
    uploadFile(path: string, body: any = {}): Observable<any> {
        ...code for preparing file here...
        return this.http.post(path, body);
    }
    
}

class FileUploader {

    // called many times-- once for each file
    uploadFile(file) {
        this.apiService.uploadFile(path, body: file).subscribe(response => {
             // use response here
        })
    }
}

What I imagine is that in the api class, instead of executing the http call immediately in the fileUpload or get functions, I could add to a queue that uses a max concurrency or something and waits to make the call until there is room. But I'm not sure how to do this given that I'm subscribed immediately in the File Uploader class.

Upvotes: 2

Views: 2503

Answers (1)

PeS
PeS

Reputation: 4039

You can use Subject and mergeMap operator

interface FileUpload {
   path: string;
   body: file;
}

export class UploadService {
  private readonly CONCURRENT_UPLOADS = 2;
  private uploadQ = new Subject<FileUpload>();

  constructor(private api: ApiService) {
    this.uploadQ.asObservable().pipe(
      mergeMap(fu => this.api.uploadFile(fu.path, fu.body)).pipe(
        // must catch error here otherwise the subscriber will fail
        // and will stop serving the Q
        catchError(err => {
          console.error('Caught error ', err);
          return of(err);
        })), this.CONCURRENT_UPLOADS),
    ).subscribe((res: WhateverResultYouGet) => {
         // process result  
      }, err => {
        // something went wrong
      });
  }

  // this is your original signature of the method but where do you get path, actually?
  /**
  * Push the file to upload Q
  */
  uploadFile(file) {
    this.uploadQ.next({path, body: file});
  }
}

Instead of firing up the upload immediately, you just push the upload to the queue. The queue is being served by the subscription in constructor, using mergeMap operator where you can actually specify concurrency.

Upvotes: 7

Related Questions