Tom O'Brien
Tom O'Brien

Reputation: 1831

Angular RXJS Polling from within nested observables

I have a service which is used by a resolver to generate and return a report.

The initial get in the service calls a REST endpoint /report which kicks off a worker job on the server,as the report is processor intensive and takes over 30 seconds to run. The report endpoint returns the id of the worker job.

I then need to poll the workder job REST endpoint /job/job_id with the relevant id for the job. I continue to polling untill it returns a state 'completed', and contains the finished report.

This final output is then returned from the service and the resolver consumes it.

I have been unable to get this working with the polling. I pipe the response to the initial report endpoint into a switchMap and then use an interval to repeatedly poll every 500ms the /job/job_id endpoint. I then try and switchMap the poll repsonse and return if complete. This is my first time using switchMap and polling, so i'm not sure if i am using this correctly.

Here is my latest attempt of the code:

getDepartmentReport() {

  return this.http
    .get<any>(reportUrl, this.getAuthOptions(true))
    .pipe(switchMap(initialResponse => {

      interval(500).pipe(
        switchMap(() => {
          return this.http.get<any>(workerUrl + initialResponse.id, this.getAuthOptions(true))
            .pipe(
              switchMap(pollResponse => {
                if(pollResponse.state === 'completed') {
                  return pollResponse;
                }
            })
      }));
  }));
}

This in fact won't compile. It gives the following error:

Argument of type '(initialResponse: any) => void' is not assignable to parameter of type '(value: any, index: number) => ObservableInput<any>'.
  Type 'void' is not assignable to type 'ObservableInput<any>'.

56         .pipe(switchMap(initialResponse => {

I assume this is happening because upon non complete polling responses, there is no return statement to handle that situation and a void is returning.

Anybody got any ideas? I'm stumped.

Upvotes: 1

Views: 761

Answers (1)

Andrei Gătej
Andrei Gătej

Reputation: 11924

This is an interesting problem.

You're getting that error because switchMap must return an Observable. In your code, you're not returning anything, you just start an interval.

You also have to tell your interval when to stop polling. This can be achieved with the help of the takeWhile operator. In order to separate things a bit more, I've created a custom operator in which the polling will take place. Doing things this way, you can reuse this operator in other places as well.

Here is my approach:

// ===== Server =====

let crtReportId = 1;
let crtReportStatus: { status: string, id: number };

const getReportFromBE = () => {
  let initialId = crtReportId;

  crtReportStatus = { status: 'pending', id: initialId };

  // It takes some time...
  timer(2000)
    .subscribe(() => crtReportStatus = { status: 'completed', id: initialId })

  return of(crtReportId++);
}

const getWorkerStatus = id => of(crtReportStatus);

// ===== Client =====

type CustomPollOperator = (data: any, cond: (d) => boolean, ms: number) => Observable<any>

const pollFor: CustomPollOperator = (data, cond, ms) => {
  let shouldPoll = true;

  return interval(ms)
    .pipe(
      tap(() => console.warn('pooling', shouldPoll)),
      takeWhile(() => shouldPoll),
      switchMap(() => getWorkerStatus(data)),
      tap(res => {
        if (cond(res)) {
          shouldPoll = false;
        }
      })
    )
}

const isWorkerCompleted = w => w.status === 'completed';

const getReports = () => {
  return getReportFromBE()
    .pipe(
      switchMap(workerId => pollFor(workerId,isWorkerCompleted, 200))
    )
}

getReports().subscribe((res) => console.log('result', res))

StackBlitz.

Upvotes: 1

Related Questions