Reputation: 2154
Let's say we have a source$
observable, which is usually user interaction. And we want to perform an async operation on each firing of source$
and "map" the async result to an output observable, say result$
.
The naivest implementation is
result$ = source$.pipe(
mergeMap((s) => someAsyncOperation(s))
)
However, it is possible for a previous response to override most recent response because someAsyncOperation
may spend different amount of time for each round.
source: -----1-----2------->
result1: -----------|
result2: --|
result: -------------2--1-->
The last value on result$
observable is 1
, which is incorrect as we have already triggered the operation for 2
and the response 2
has already arrived.
We can replace mergeMap
with switchMap
and the graph would be:
source: -----1-----2------->
result1: -----------|
result2: --|
result: -------------2----->
For typical use cases like search suggestion, switchMap
is desirable since the response-1
is most likely to be valueless once action-2
is fired.
But for some other cases, responses for previous actions may still be valid. For example, for a periodic polling scenario, responses are valuable as long as their chronical order is perserved.
source: -----1-----2----3------->
result1: --------|
result2: -----------|
result3: ----|
mergeMap: -----------1------3-2->
switchMap:------------------3--->
expected: -----------1------3--->
It's obvious that response-1
and response-3
are both desirable as they arrive in chronical order (while response-2
is invalid because it arrives after response-3
).
The problem with mergeMap
is that it cannot omit the invalid response-2
.
While switchMap
is also suboptimal because it omits a desirable value response-1
as the second observable has already started when response-1
arrives. The problem of switchMap
worsens when the average RTT is larger than the polling interval.
source: -----1----2----3----4----5----->
result1: --------|
result2: --------|
result3: --|
result4: -------|
result5: ----|
switchMap:---------------3------------->
expected: -----------1---3---------4-5->
You can see switchMap
generates far less outputs than the ideal one
How should I get the expected output observable in this case?
Upvotes: 1
Views: 81
Reputation: 14750
You can attach an "emission index" to each response and use that to filter out older emissions.
const result$ = source$.pipe(
mergeMap((s, index) => someAsyncOperation(s).pipe(
map(response => ({ response, index }))
)),
scan((prev, cur) => ({...cur, maxIndex: Math.max(prev.maxIndex, cur.index)}), INITIAL),
filter(({index, maxIndex}) => index === maxIndex),
map(({response}) => response),
);
Here we can use scan
to keep track of the highest emitted index thus far, then use filter
to prevent emissions from older requests.
Here's a working StackBlitz demo.
Upvotes: 1