Patrick
Patrick

Reputation: 909

RxJS (Interval) Observable wait for last observable to complete

I want an interval observable which waits for the last execution. This are my tries.

Simple thing which not wait.

interval(1000)
  .subscribe(async x => {
    await new Promise(resolve => setTimeout(resolve, Math.floor(Math.random() * 10000) + 1000));
    console.log('Got counter', x);
  });

Results in: 4, 1, 2, 6, 9, 7, 6, 3, ...

Next try but a bit bad.

let alreadyRunning = false;
interval(1000)
  .pipe(skipWhile(() => alreadyRunning))
  .subscribe(async x => {
    alreadyRunning = true;
    await new Promise(resolve => setTimeout(resolve, Math.floor(Math.random() * 10000) + 1000));
    console.log('Got counter', x, alreadyRunning);
    alreadyRunning = false;
  });

skipWhile waits only before the first thing is true.

Now I tried switchMap which also not works.

interval(1000)
  .pipe(switchMap(() => from(new Promise(resolve => setTimeout(resolve, Math.floor(Math.random() * 10000) + 1000)))))
  .subscribe(async x => {
    console.log('Got counter', x);
  });

Also not working:

interval(1000)
  .pipe(switchMap(x => from(async () => {
    await new Promise(resolve => setTimeout(resolve, Math.floor(Math.random() * 10000) + 1000));
    console.log('Got counter', x);
    return x;
  })))
  .subscribe(async x => {
    console.log('X', x);
  });

Is there a solution to realize that? Waiting for last observable to finish? After the subscribe there is no chance to do that. So how I can do this before.

//edit 1: What I want?

I have an interval which executes an HTTP Request within. So when the HTTP Request waits for few seconds the next interval will be executed so that the request ist executed multiple times.

That I want to avoid.

MergeMap is not working, too.

interval(1000)
  .pipe(mergeMap(x => from(new Promise(resolve => setTimeout(() => resolve(x), Math.floor(Math.random() * 10000) + 1000)))))
  .subscribe(async x => {
    console.log('Got counter', x);
  });

Upvotes: 0

Views: 1336

Answers (1)

Mrk Sef
Mrk Sef

Reputation: 8022

I'm not sure what you're asking, but if user AJT82 is right (Ie, You'd like to drop any emissions from the interval while the previous request hasn't completed)

Here's how I would implement this:

interval(1000).pipe(
  tap(_ => console.log('interval hits every second!')),
  exhaustMap(_ => {
    console.log("request has started!")
    return timer(3000).pipe( // simulation that takes 3 seconds to complete
      tap(_ => console.log("request has finished!"))
    )
  })
).subscribe({
  next: v => console.log("Value emitted: ", v),
  error: e => console.log("Error emitted: ", e),
  complete: () => console.log("Complete emitted")
});

An Aside on async/await

Observables are a strict superset of promises. This means that if you can do something with a promise, you can do it with an observable as well.

Unlike Promises though, Observables don't have special syntactic sugar like async/await.

Observables interoperate with Promises fairly well. Most operators work with promises as well as with observables by converting promises for you in the background.

This makes it helpful if your project is using one and you'd like to include a library that uses the other.

On the other hand, if your code is already using the machinery of observables, there is never ever a reason to use promises. This is a code-smell should be avoided unless you really have pretty good reasons.

Upvotes: 2

Related Questions