Frank
Frank

Reputation: 745

RxJS Make call no more than once per second, but don't lose any calls

Basically I'd like to create a queue. Something like

const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))

where i might call

queue.next({arg1, arg2, arg3})

in several places, sometimes very quickly after one another. I can't use throttle or debounce because I can't lose the intermediate calls. I need every call to be called, but no more than 1 per second. If two were to fire within a second of eachother, one would have to wait for 1 second. If 3 were to fire within a second, one would wait one second, another would wait 2 seconds.

Upvotes: 2

Views: 780

Answers (2)

Leon Radley
Leon Radley

Reputation: 7672

I recently found myself in the same situation. the api I was consuming could only take 4 request every second.

This is what I came up with.

A rateLimit pipe

import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'

export function rateLimit<T>(
  count: number,
  slidingWindowTime: number,
  scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
  let tokens = count
  const tokenChanged = new BehaviorSubject(tokens)
  const consumeToken = () => tokenChanged.next(--tokens)
  const renewToken = () => tokenChanged.next(++tokens)
  const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))

  return mergeMap<T, Observable<T>>((value: T) =>
    availableTokens.pipe(
      take(1),
      map(() => {
        consumeToken()
        timer(slidingWindowTime, scheduler).subscribe(renewToken)
        return value
      }),
    ),
  )
}

And you can use it like this. I want to get all contracts in contractIds$ from the api. I only want to send 4 requests every 1000ms

const contracts$ = contractIds$.pipe(
  rateLimit(4, 1000),
  mergeMap(contract => this.get(contract.DocumentNumber)),
)

Maybe this will help you out :)

Upvotes: 2

Jonathan Stellwag
Jonathan Stellwag

Reputation: 4267

You can use the completion of an observable in combination with combineAll.

combineAll will emit next observable, when previous one has completed

1. Create your Subject

const source$$ = new Subject();

2. Provide a function that maps your value to an Observable that is completed after 1000 ms

const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
  takeUntil(timer(time))
);

You do not need to make the function time dynamic, I just made (time) => (value) => ... because I wanted to write a operator like throttle(1000) that has a dynamic time range. You can just write (value) => Observable... if you want a static time

3. Use your function to map your value to an time boxed observable and merge all observables together within the concatAll

const result$ = source$$.pipe(
  map(timeCompletedSource$(time))
  concatAll()
);

You find a running stackblitz here

Pro: Make a custom operator

const fastThrottle = (time) => (source) => source.pipe(
  map(timeCompletedSource$(1000)),
  concatAll()
)

const result$ = source$$.pipe(
  fastThrottle(1000)
);

Upvotes: 0

Related Questions