Nandin Borjigin
Nandin Borjigin

Reputation: 2154

How to organize async response in RxJS (or FRP more generically)

Let's say we have a source$ observable, which is usually user interaction. And we want to perform an async operation on each firing of source$ and "map" the async result to an output observable, say result$.

mergeMap

The naivest implementation is

result$ = source$.pipe(
  mergeMap((s) => someAsyncOperation(s))
)

However, it is possible for a previous response to override most recent response because someAsyncOperation may spend different amount of time for each round.

source: -----1-----2------->
result1:     -----------|
result2:           --|
result: -------------2--1-->

The last value on result$ observable is 1, which is incorrect as we have already triggered the operation for 2 and the response 2 has already arrived.

switchMap

We can replace mergeMap with switchMap and the graph would be:

source: -----1-----2------->
result1:     -----------|
result2:           --|
result: -------------2----->

For typical use cases like search suggestion, switchMap is desirable since the response-1 is most likely to be valueless once action-2 is fired.

Problem

But for some other cases, responses for previous actions may still be valid. For example, for a periodic polling scenario, responses are valuable as long as their chronical order is perserved.

source: -----1-----2----3------->
result1:     --------|
result2:           -----------|
result3:                ----|
mergeMap: -----------1------3-2->
switchMap:------------------3--->

expected: -----------1------3--->

It's obvious that response-1 and response-3 are both desirable as they arrive in chronical order (while response-2 is invalid because it arrives after response-3).

The problem with mergeMap is that it cannot omit the invalid response-2. While switchMap is also suboptimal because it omits a desirable value response-1 as the second observable has already started when response-1 arrives. The problem of switchMap worsens when the average RTT is larger than the polling interval.


source: -----1----2----3----4----5----->
result1:     --------|
result2:          --------|
result3:               --|
result4:                    -------|
result5:                         ----|
switchMap:---------------3------------->
expected: -----------1---3---------4-5->

You can see switchMap generates far less outputs than the ideal one

Question

How should I get the expected output observable in this case?

Upvotes: 1

Views: 81

Answers (1)

BizzyBob
BizzyBob

Reputation: 14750

You can attach an "emission index" to each response and use that to filter out older emissions.

const result$ = source$.pipe(
  mergeMap((s, index) => someAsyncOperation(s).pipe(
    map(response => ({ response, index }))
  )),
  scan((prev, cur) => ({...cur, maxIndex: Math.max(prev.maxIndex, cur.index)}), INITIAL),
  filter(({index, maxIndex}) => index === maxIndex),
  map(({response}) => response),
);

Here we can use scan to keep track of the highest emitted index thus far, then use filter to prevent emissions from older requests.

Here's a working StackBlitz demo.

Upvotes: 1

Related Questions