Reputation: 745
Basically I'd like to create a queue. Something like
const queue = new BehaviorSubject([])
queue.subscribe((args) => someCall(args))
where i might call
queue.next({arg1, arg2, arg3})
in several places, sometimes very quickly after one another. I can't use throttle
or debounce
because I can't lose the intermediate calls. I need every call to be called, but no more than 1 per second. If two were to fire within a second of eachother, one would have to wait for 1 second. If 3 were to fire within a second, one would wait one second, another would wait 2 seconds.
Upvotes: 2
Views: 780
Reputation: 7672
I recently found myself in the same situation. the api I was consuming could only take 4 request every second.
This is what I came up with.
A rateLimit pipe
import { asyncScheduler, BehaviorSubject, timer, MonoTypeOperatorFunction, Observable } from 'rxjs'
import { filter, map, mergeMap, take } from 'rxjs/operators'
export function rateLimit<T>(
count: number,
slidingWindowTime: number,
scheduler = asyncScheduler,
): MonoTypeOperatorFunction<T> {
let tokens = count
const tokenChanged = new BehaviorSubject(tokens)
const consumeToken = () => tokenChanged.next(--tokens)
const renewToken = () => tokenChanged.next(++tokens)
const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))
return mergeMap<T, Observable<T>>((value: T) =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(slidingWindowTime, scheduler).subscribe(renewToken)
return value
}),
),
)
}
And you can use it like this. I want to get all contracts in contractIds$ from the api. I only want to send 4 requests every 1000ms
const contracts$ = contractIds$.pipe(
rateLimit(4, 1000),
mergeMap(contract => this.get(contract.DocumentNumber)),
)
Maybe this will help you out :)
Upvotes: 2
Reputation: 4267
You can use the completion of an observable in combination with combineAll.
combineAll will emit next observable, when previous one has completed
1. Create your Subject
const source$$ = new Subject();
2. Provide a function that maps your value to an Observable that is completed after 1000 ms
const timeCompletedSource$ = (time) => (value) => Observable.create(observer => observer.next(v)).pipe(
takeUntil(timer(time))
);
You do not need to make the function time dynamic, I just made (time) => (value) => ... because I wanted to write a operator like throttle(1000) that has a dynamic time range. You can just write (value) => Observable... if you want a static time
3. Use your function to map your value to an time boxed observable and merge all observables together within the concatAll
const result$ = source$$.pipe(
map(timeCompletedSource$(time))
concatAll()
);
You find a running stackblitz here
Pro: Make a custom operator
const fastThrottle = (time) => (source) => source.pipe(
map(timeCompletedSource$(1000)),
concatAll()
)
const result$ = source$$.pipe(
fastThrottle(1000)
);
Upvotes: 0