Reputation: 1092
A somewhat difficult problem to explain: In my code, a function is called very often. The return value is an observable. However, the called function takes some time to complete.
The send()
is the function that is called very often. The process
function takes some time.
send(value) {
process(value).subscribe( result => {
...do something
})
}
What I want to achieve is that process
is only called as long as process
is not currently running. However, no the last call should be lost and the last call to send()
must be executed.
In principle, I would like to have a kind of queue where only the last element is processed. But the question now is: how do I best achieve this.
So far I've tried the following:
private isProcessing:boolean;
send(value:any) {
if (!this.isProcessing) {
this.isProcessing = true
process(value).subscribe(result => {
this.isProcessing = false
...do something
})
}
}
However, under certain circumstances, I lose the last call here. Doe anyone has a smart idea?
Upvotes: 0
Views: 237
Reputation: 3580
Sure! Rxjs operators to the rescue!
I suggest you to create a new Subject and emit to it in the send
function. Then you should pipe the output from this Subject and map your first observable (a subject is basically an observable) to your observable returned from your process
function.
In your case is makes sense to either map this first observable to the second one using exhaustMap
operator or switchMap
operator. But there is a problem with both of them.
private send$ = new Subject<ValueType>();
public send(value) {
this.send$.emit(value);
}
//either exhaustMap: will not trigger process(value) until last observable from process(value) has completed
private processResult$ = send$.pipe(exhaustMap( value => process(value) ));
//or switchMap: will cancel subscription to last observable coming from process(value) and switch to next one
private processResult$ = send$.pipe(switchMap( value => process(value) ));
Now to the downsights of both solutions:
exhaustMap will ensure that process
only gets called after last 'run' is completed. Nevertheless it's possible that the last data point is lost. Here the problem is: How do we know which data point is the last one? Will the stream be completed?
switchMap will ensure that the last data point is not lost. Nevertheless process
function will be called for each value emitted. You should ensure that cancel of the subscription leads to no further processing.
Upvotes: 1