TalVik99
TalVik99

Reputation: 149

Repeat HTTP call until the desired value is returned with RxJs

I need to repeat the query if the field of the returned object has the value 'INPROGRESS' with a delay so as not to clog up the server. If another field value is returned, the loop stops and I perform some action in subscribe() with its response.

My attempts so far have ended up with this code, where unfortunately the queries repeat infinitely.

this.service
.query(id: number)
.pipe(
  repeatWhen(obs => obs.pipe(delay(1000))),
  filter((response) => response.Status === 'INPROGRESS'),
  take(1),
)
.subscribe(...)

Upvotes: 5

Views: 4542

Answers (4)

Eddy Lin
Eddy Lin

Reputation: 683

  • RxJS 6.x
this.service.query(id: number).pipe(
  repeatWhen(delay(1000)),
  skipWhile((response) => response.Status === 'INPROGRESS'),
  take(1),
).subscribe(...)

https://stackblitz.com/edit/rxjs-cc1ekf

  • RxJS 7.5.x
this.service.query(id: number).pipe(
  repeat({ delay: 1000 }),
  skipWhile((response) => response.Status === 'INPROGRESS'),
  take(1),
).subscribe(...)

https://stackblitz.com/edit/rxjs-kafps9

Upvotes: 3

Barremian
Barremian

Reputation: 31125

This is the straight forward way I could think of without any recursions.

  1. timer function to emit a value at a regular interval
  2. switchMap operator to trigger the HTTP request for each emit from timer
  3. filter operator to stop triggering the subscribe's next callback for each emit.
  4. takeWhile operator with inclusive parameter set to true to close the observable based on a condition and emit the last value that failed the condition. This could in this case also be take(1) since our filter condition would block the other emits.
const RESPONSE_IN_PROGRESS: string = 'INPROGRESS';
const POLL_INTERVAL: number = 1000;

timer(0, POLL_INTERVAL).pipe(
  switchMap(() => this.service.query(id)),              // <-- trigger the request
  filter((res: any) =>                                  // <-- stop triggering subscribe `next` callback for each emit
    res.Status !== RESPONSE_IN_PROGRESS
  ),         
  takeWhile(                                            
    (res: any) => res.Status === RESPONSE_IN_PROGRESS,  // <-- close the observable when the condition fails
    true                                                // <-- emit the last notification that failed the condition
  ) 
).subscribe({
  next: (res: any) => {
    // handle response
  },
  error: (error: any) => {
    // handle error
  }
});

Upvotes: 2

Chellappan வ
Chellappan வ

Reputation: 27303

rxjs has expand operator which will allow us to make recurive api call

Try this

 this.service
      .query(id: number)
      .pipe(
        delay(1000),
        expand((response: any) => {
          if (response.Status === 'INPROGRESS') {
            return return this.service
          .query(id: number).pipe(delay(1000));
          }
          return EMPTY;
        }),
        takeLast(1)
      )
      .subscribe(...)

Upvotes: 8

Zerotwelve
Zerotwelve

Reputation: 2361

with a recursie function, something like this should do the job:

private myFunc(){
    this.myRecursiveFunc(1).subscribe(response => console.log(response));
}


private myRecursiveFunc(id: number, response?:any):Observable<any>{
    if(response && response.Status !== 'INPROGRESS'){
        return of(response);
    }
    return this.service.query(id).pipe(
        delay(1000),
        concatMap(response => this.myRecursiveFunc(id, response)
    );
}

Upvotes: 1

Related Questions