Lars
Lars

Reputation: 1092

How to process a queue with dropping elements in Angular & rxjs?

A somewhat difficult problem to explain: In my code, a function is called very often. The return value is an observable. However, the called function takes some time to complete.

The send() is the function that is called very often. The process function takes some time.

send(value) {
    process(value).subscribe( result => {
        ...do something
    })
}

What I want to achieve is that process is only called as long as process is not currently running. However, no the last call should be lost and the last call to send() must be executed.

In principle, I would like to have a kind of queue where only the last element is processed. But the question now is: how do I best achieve this.

So far I've tried the following:

private isProcessing:boolean;
send(value:any) {
    if (!this.isProcessing) {
        this.isProcessing = true

        process(value).subscribe(result => {
            this.isProcessing = false
            ...do something
       })
    }
}

However, under certain circumstances, I lose the last call here. Doe anyone has a smart idea?

Upvotes: 0

Views: 237

Answers (1)

Fabian Strathaus
Fabian Strathaus

Reputation: 3580

Sure! Rxjs operators to the rescue! I suggest you to create a new Subject and emit to it in the send function. Then you should pipe the output from this Subject and map your first observable (a subject is basically an observable) to your observable returned from your process function.

In your case is makes sense to either map this first observable to the second one using exhaustMap operator or switchMap operator. But there is a problem with both of them.

private send$ = new Subject<ValueType>();
public send(value) {
  this.send$.emit(value);
}

//either exhaustMap: will not trigger process(value) until last observable from process(value) has completed
private processResult$ = send$.pipe(exhaustMap( value => process(value) ));

//or switchMap: will cancel subscription to last observable coming from process(value) and switch to next one
private processResult$ = send$.pipe(switchMap( value => process(value) ));

Now to the downsights of both solutions:

exhaustMap will ensure that process only gets called after last 'run' is completed. Nevertheless it's possible that the last data point is lost. Here the problem is: How do we know which data point is the last one? Will the stream be completed?

switchMap will ensure that the last data point is not lost. Nevertheless process function will be called for each value emitted. You should ensure that cancel of the subscription leads to no further processing.

Upvotes: 1

Related Questions