Reputation: 11661
I have http take say 10 seconds to complete (sometimes it take 1 second to complete) and interval run every 5 seconds.
I want interval to stop and wait until the http is complete then continue.
This is the reproduce the problem:
refresh$ = interval(5 * 1000).pipe(tap(() => this.load()));
fakehttp() {
return timer(10 * 1000).pipe(take(1));
}
load() {
this.fakehttp().subscribe((r) => {
console.log("data");
});
}
ngOnInit() {
this.refresh$.subscribe((r) => {
console.log("refresh!");
});
}
Upvotes: 1
Views: 1192
Reputation: 369
A much better approach would use interval
along with the repeat
and delay
operators, so that if the call is blocking, RXJS won't be stacking calls.
This exemple will try every 5 seconds. If the call it taking more that 5 seconds, it will wait.
this.logViewService.readLogs$().pipe(
catchError((error: Error) => of({} as logViewDto)
repeat({
delay: () => interval(5000)
}),
takeUntilDestroyed(this.destroyRef)
).subscribe((logViewDto: LogViewDto): void => {...})
Upvotes: 0
Reputation: 31125
Using tap
to trigger the second observable results in multiple subscriptions.
If I understand the question correctly, you're trying to re-trigger the HTTP request after 5 seconds AFTER the current request has emitted. In that case the time interval b/n the calls is dynamic. In that case you'd try to trigger the request manually after each notification using BehaviorSubject
.
Try the following
import { timer, BehaviorSubject, Subject } from 'rxjs';
import { switchMap, takeUntil } from 'rxjs/operators';
export class Sample implements OnInit, OnDestroy {
timerSrc = new BehaviorSubject<number>(0); // <-- default time
close = new Subject<any>(); // <-- use to close open observables
ngOnInit() {
this.timerSrc.asObservable().pipe(
takeUntil(this.close),
switchMap((time: number) => timer(time)),
switchMap(() => this.httpRequest())
).subscribe({
next: (response: any) => {
// use response
this.timerSrc.next(5000); // <-- call again in 5 seconds
},
error: (error: any) => {
// handle error
}
});
}
ngOnDestroy() {
this.close.next(); // <-- close open subscription
}
}
If you do not mind have a fixed timer of 5 seconds b/n each successive calls, you could pipe to a timer with 5 second interval using exhaustMap
. It'd ignore incoming emissions until the inner observable (the HTTP request) has emitted.
Note that here there would be no guarantee that each successive would have a fixed time interval of 5 seconds. It might be anything b/n 0 - 5 seconds depending on the time taken for the HTTP request to emit.
import { timer, Subject } from 'rxjs';
import { exhaustMap } from 'rxjs/operators';
export class Sample implements OnInit, OnDestroy {
close = new Subject<any>(); // <-- use to close open observables
ngOnInit() {
timer(0, 5000).pipe(
takeUntil(this.close),
exhaustMap(() => this.httpRequest())
).subscribe({
next: (response: any) => {
// use response
},
error: (error: any) => {
// handle error
}
});
}
ngOnDestroy() {
this.close.next(); // <-- close open subscription
}
}
Upvotes: 1
Reputation: 29087
You can create your own observable that you manually update. Then set up a recursive timeout()
call in load()
. That way you always process the result and then wait 10 seconds. You you will not end up with premature update calls:
refresh$ = new Rx.Subject();
fakehttp() {
return timer(10 * 1000).pipe(take(1));
}
load() {
this.fakehttp().subscribe((r) => {
console.log("data");
//push value to observable
refresh$.next(r);
//schedule the next execution
timer(10 * 1000)
.subscribe(() => this.load());
});
}
ngOnInit() {
this.refresh$.subscribe((r) => {
console.log("refresh!");
});
//start the update cycle
this.load();
}
The manual observable update approach comes from this answer of luisgabrial;
Upvotes: 0