Jon Sud
Jon Sud

Reputation: 11661

How to tell interval to wait for the last http call?

I have http take say 10 seconds to complete (sometimes it take 1 second to complete) and interval run every 5 seconds.

I want interval to stop and wait until the http is complete then continue.

This is the reproduce the problem:

codesandbox.io

  refresh$ = interval(5 * 1000).pipe(tap(() => this.load()));

  fakehttp() {
    return timer(10 * 1000).pipe(take(1));
  }

  load() {
    this.fakehttp().subscribe((r) => {
      console.log("data");
    });
  }

  ngOnInit() {
    this.refresh$.subscribe((r) => {
      console.log("refresh!");
    });
  }

Upvotes: 1

Views: 1192

Answers (3)

A. Parolini
A. Parolini

Reputation: 369

A much better approach would use interval along with the repeat and delay operators, so that if the call is blocking, RXJS won't be stacking calls.

This exemple will try every 5 seconds. If the call it taking more that 5 seconds, it will wait.

this.logViewService.readLogs$().pipe(
        catchError((error: Error) => of({} as logViewDto)
        repeat({
            delay: () => interval(5000)
        }),
        takeUntilDestroyed(this.destroyRef)
    ).subscribe((logViewDto: LogViewDto): void => {...})
      

Upvotes: 0

Barremian
Barremian

Reputation: 31125

Using tap to trigger the second observable results in multiple subscriptions.

Option 1

If I understand the question correctly, you're trying to re-trigger the HTTP request after 5 seconds AFTER the current request has emitted. In that case the time interval b/n the calls is dynamic. In that case you'd try to trigger the request manually after each notification using BehaviorSubject.

Try the following

import { timer, BehaviorSubject, Subject } from 'rxjs';
import { switchMap, takeUntil } from 'rxjs/operators';

export class Sample implements OnInit, OnDestroy {
  timerSrc = new BehaviorSubject<number>(0);  // <-- default time
  close = new Subject<any>();  // <-- use to close open observables

  ngOnInit() {
    this.timerSrc.asObservable().pipe(
      takeUntil(this.close),
      switchMap((time: number) => timer(time)),
      switchMap(() => this.httpRequest())
    ).subscribe({
      next: (response: any) => {
        // use response
        this.timerSrc.next(5000);  // <-- call again in 5 seconds
      },
      error: (error: any) => {
        // handle error
      }
    });
  }

  ngOnDestroy() {
    this.close.next();  // <-- close open subscription 
  }
}

Option 2

If you do not mind have a fixed timer of 5 seconds b/n each successive calls, you could pipe to a timer with 5 second interval using exhaustMap. It'd ignore incoming emissions until the inner observable (the HTTP request) has emitted.

Note that here there would be no guarantee that each successive would have a fixed time interval of 5 seconds. It might be anything b/n 0 - 5 seconds depending on the time taken for the HTTP request to emit.

import { timer, Subject } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';

export class Sample implements OnInit, OnDestroy {
  close = new Subject<any>();  // <-- use to close open observables

  ngOnInit() {
    timer(0, 5000).pipe(
      takeUntil(this.close),
      exhaustMap(() => this.httpRequest())
    ).subscribe({
      next: (response: any) => {
        // use response
      },
      error: (error: any) => {
        // handle error
      }
    });
  }

  ngOnDestroy() {
    this.close.next();  // <-- close open subscription 
  }
}

Upvotes: 1

VLAZ
VLAZ

Reputation: 29087

You can create your own observable that you manually update. Then set up a recursive timeout() call in load(). That way you always process the result and then wait 10 seconds. You you will not end up with premature update calls:

refresh$ = new Rx.Subject();

fakehttp() {
  return timer(10 * 1000).pipe(take(1));
}

load() {
  this.fakehttp().subscribe((r) => {
    console.log("data");

    //push value to observable
    refresh$.next(r);
    //schedule the next execution
    timer(10 * 1000)
      .subscribe(() => this.load());
  });
}

ngOnInit() {
  this.refresh$.subscribe((r) => {
    console.log("refresh!");
  });

  //start the update cycle
  this.load();
}

The manual observable update approach comes from this answer of luisgabrial;

Upvotes: 0

Related Questions