Molochdaa
Molochdaa

Reputation: 2218

How to implement polling with backpressure in RXJS

I want to poll a HTTP API that can be slow to answer, so I don't want to make multiple calls to this API at the same time.

An example of what I want to do could be:

const interval = Rx.Observable.interval(250).take(5); // Poll every 250ms

function simulateMaybeSlowHttpCall() {
  return interval.delay(500).take(1); // The service takes 500ms to answer
}

 interval
   .mergeMap(val =>simulateMaybeSlowHttpCall().map(x => val), 1) // max concurrent is 1
   .subscribe(val => console.log(val));

Here, this code will display 1 2 3 4 5

But I don't want to do useless call. The above code runs for 250*5 = 1250 ms, 1 call take 500ms, so I would like to display:

1 3 5

So my question is: when setting concurrent to 1 (or any other values), how can I discard all the call not done immediately ?

JsFiddle: https://jsfiddle.net/zra3zxhs/63/

Upvotes: 2

Views: 678

Answers (1)

cartant
cartant

Reputation: 58400

Using mergeMap with a concurrency of one is equivalent to concatMap. In fact, that's how concatMap is implemented. That's why each interval in your example effects a HTTP request: they are queued.

If you wish to avoid initiating or queuing a HTTP request whilst one is pending, you could use exhaustMap:

interval
  .exhaustMap(val => simulateMaybeSlowHttpCall().map(x => val))
  .subscribe(val => console.log(val));

When exhaustMap is used, any next notifications that it receives are ignored until the inner observable (the HTTP request) completes.

Upvotes: 5

Related Questions