Joseph Silber
Joseph Silber

Reputation: 219938

RxJS mergeMap() with original order

The abstract problem

Is there any way to consume the result of a mergeMap in the original order of the outer observable, while still allowing the inner observables to run in parallel?


More detailed explanation

Let's look at two merge-mapping operators:

So back to the question: is it possible to get the benefits of mergeMap, whereby a given amount of requests can be run in parallel, while still having the mapped values be emitted in the original order?


My concrete problem

The above described the problem in abstract. It is sometimes easier to reason about a problem when you know the actual problem at hand, so here goes:

  1. I have a list of orders that have to be shipped:

     const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
  2. I have a shipOrder method that actually ships the orders. It returns a Promise:

     const shipOrder = orderNumber => api.shipOrder(orderNumber);
    
  3. The API can only process up to 5 order shipments simultaneously, so I'm using mergeMap to handle that:

     from(orderNumbers).pipe(
         mergeMap(orderNumber => shipOrder(orderNumber), 5)
     );
    
  4. After an order is shipped, we need to print its shipping label. I have a printShippingLabel function that, given the order number of a shipped order, will print its shipping label. So I subscribe to our observable, and print the shipping labels as the values come in:

     from(orderNumbers)
         .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
         .pipe(orderNumber => printShippingLabel(orderNumber));
    
  5. This works, but now the shipping labels are printed out of order, since mergeMap emits values based on when shipOrder completes its request. What I want is for the labels to print in the same order as the original list.

Is that possible?


Visualization

See here for a visualization of the problem: https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010

You can see that earlier orders are being printed before later orders are even shipped.

Upvotes: 9

Views: 4678

Answers (6)

Ajinath  Jedhe
Ajinath Jedhe

Reputation: 180

Take a look at this solution parallel calls with sequential loading using mergeMap and scan operatos

posts$: Subject<Post[]> = new Subject();

fetchPostsByIds() {
  const postsIds = [1,2,3,4,5];
  let sequenceNumber = 0;
  const posts:any = [];
  from(postsIds).pipe(
     mergeMap((postId: number, index: number) => 
       this.http.get(`${this.host}/posts/${postId}`).pipe(
         map(post => {
           return { post, index }
         })
      ),
     scan((acc: Post[], post: any) => [ ...acc, post], [])
  ).subscribe((result:any[]) => {
     let sequentialPost = result.find((post:any) => post.index === sequenceNumber);
     while(sequentialPost) {
       sequenceNumber++;
       posts.push(sequentialPost.post)
       console.log("result =", posts)
       this.posts$.next(posts);
       sequentialPost = result.find((post:any) => post.index === sequenceNumber);
    }
})

This will log the result in sequntial order

enter image description here

Upvotes: 0

Adrian Brand
Adrian Brand

Reputation: 21638

Use a scan to buffer up the results and emit them in order when they are available.

const { delay, map, mergeMap, of, scan, switchMap } = rxjs;

const api = {
  get: (url, data) => of(data.number * 2).pipe(delay(Math.random() * 500))
};

of(1, 2, 3, 4, 5, 6).pipe(
  mergeMap((number, index) => api.get('/double', { number }).pipe(
    map(response => ({ response, index })) // We need to keep the index for ordering
  ), 3),
  scan(
    (acc, response) => {
      acc.emit = [];
      let index = acc.responses.findIndex(r => r.index > response.index);
      if (index === -1) {
        index = acc.responses.length;
      }
      acc.responses.splice(index, 0, response);
      while (acc.current === acc.responses[0]?.index) {
        acc.emit.push(acc.responses.shift());
        acc.current++;
      }
      return acc;
    },
    { current: 0, emit: [], responses: [] }
  ),
  switchMap(acc => of(...acc.emit.map(r => r.response)))
).subscribe(response => {
  console.log(response);
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

Upvotes: 0

mofelee
mofelee

Reputation: 11

Updated at 2022-04-03

replaced concatMap((v) => of(v).pipe(delayWhen(() => pipeNotifier))), with zipWith


Pseudocode

zipWith(subject),
map(...do async stuff...),
concatAll()

Result

https://youtu.be/NEr6qfPlahY

request 1
request 2
request 3
response 3
request 4
response 1
request 5
1
response 4
request 6
response 2
request 7
2
3
4
response 6
request 8
response 5
request 9
5
6
response 7
request 10
7
response 9
response 10
response 8
8
9
10

Code

https://stackblitz.com/edit/js-fpds79

import { range, Subject, from, zipWith } from 'rxjs';
import { share, map, concatAll } from 'rxjs/operators';

const pipeNotifier = new Subject();

range(1, 10)
  .pipe(
    // 1. Make Observable controlled by pipeNotifier
    zipWith(pipeNotifier),
    // 2. Submit the request
    map(([v]) =>
      from(
        (async () => {
          console.log('request', v);
          await wait();
          console.log('response', v);

          pipeNotifier.next();

          return v;
        })()
      )
    ),
    // 3. Keep order
    concatAll()
  )
  .subscribe((x) => console.log(x));

// pipeNotifier controler
range(0, 3).subscribe(() => {
  pipeNotifier.next();
});

function wait() {
  return new Promise((resolve) => {
    const random = 5000 * Math.random();
    setTimeout(() => resolve(random), random);
  });
}

Upvotes: 1

Josep
Josep

Reputation: 13071

You could use this operator: sortedMergeMap, example.

const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
  mapper: (i: I) => ObservableInput<O>,
  concurrent = 1
) => (source$: Observable<I>) =>
  source$.pipe(
    mergeMap(
      (value, idx) =>
        concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
      concurrent
    ),
    scan(
      (acc, [value, idx]) => {
        if (idx === acc.currentIdx) {
          if (value === DONE) {
            let currentIdx = idx;
            const valuesToEmit = [];
            do {
              currentIdx++;
              const nextValues = acc.buffer.get(currentIdx);
              if (!nextValues) {
                break;
              }
              valuesToEmit.push(...nextValues);
              acc.buffer.delete(currentIdx);
            } while (valuesToEmit[valuesToEmit.length - 1] === DONE);
            return {
              ...acc,
              currentIdx,
              valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
            };
          } else {
            return {
              ...acc,
              valuesToEmit: [value]
            };
          }
        } else {
          if (!acc.buffer.has(idx)) {
            acc.buffer.set(idx, []);
          }
          acc.buffer.get(idx)!.push(value);
          if (acc.valuesToEmit.length > 0) {
            acc.valuesToEmit = [];
          }
          return acc;
        }
      },
      {
        currentIdx: 0,
        valuesToEmit: [] as O[],
        buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
      }
    ),
    mergeMap(scannedValues => scannedValues.valuesToEmit)
  );

Upvotes: 1

Richard Laffers
Richard Laffers

Reputation: 89

What you want is this:

from(orderNumbers)
  .pipe(map(shipOrder), concatAll())
  .subscribe(printShippingLabel)

Explanation:

The first operator in the pipe is map. It calls shipOrder for each value immediately (so subsequent values may start parallel requests).

The second operator, concatAll, puts the resolved values in proper sequence.

(I simplified the code; concatAll() is equivalent to concatMap(identity).)

Upvotes: 0

Joseph Silber
Joseph Silber

Reputation: 219938

I did manage to partially solve it, so I'm posting it here as an answer to my own question.

I still very much want to know the canonical way to handle this situation.


A convoluted solution

  1. Create a custom operator that takes values that have an index key ({ index: number } in Typescript parlance), and keeps a buffer of the values, only emitting them according to their index's order.

  2. Map the original list into a list of objects with their index embedded.

  3. Pass that onto our custom sortByIndex operator.

  4. Map the values back into their original values.

Here's what that sortByIndex would look like:

function sortByIndex() {
    return observable => {
        return Observable.create(subscriber => {
            const buffer = new Map();
            let current = 0;
            return observable.subscribe({
                next: value => {
                    if (current != value.index) {
                        buffer.set(value.index, value);
                    } else {
                        subscriber.next(value);
                    
                        while (buffer.has(++current)) {
                            subscriber.next(buffer.get(current));
                            buffer.delete(current);
                        }
                    }
                },
                complete: value => subscriber.complete(),
            });
        });
    };
}

With the sortByIndex operator in place, we can now complete our whole pipeline:

of(1, 2, 3, 4, 5, 6).pipe(
    map((number, index) => ({ number, index })),
    mergeMap(async ({ number, index }) => {
        const doubled = await api.get('/double', { number });
        return { index, number: doubled };
    }, 3),
    sortByIndex(),
    map(({ number }) => number)
);

See it here in action: https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010

Creating a concurrentConcat operator

In fact, with this sortByIndex operator in place, we can now create a general concurrentConcat operator, which will do the transformations to and from the { index: number, value: T } type internally:

function concurrentConcat(mapper, parallel) {
    return observable => {
        return observable.pipe(
            mergeMap(
                mapper,
                (_, value, index) => ({ value, index }),
                parallel
            ),
            sortByIndex(),
            map(({ value }) => value)
        );
    };
}

We can then use this concurrentConcat operator instead of mergeMap, and it will now emit the values in their original order:

of(1, 2, 3, 4, 5, 6).pipe(
    concurrentConcat(number => api.get('/double', { number }), 3),
);

See it here in action: https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010

So to solve my original problem with the order shipments:

from(orderNumbers)
    .pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
    .subscribe(orderNumber => printShippingLabel(orderNumber));

See it here in action: https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010

You can see that even though later orders might end up being shipped before earlier ones, the labels are always printed in their original order.


Conclusion

This solution is not even complete (since it doesn't handle inner observables that emit more than one value) yet it requires a bunch of custom code. This is such a common problem, that I feel there has to be an easier (built-in) way to solve this :|

Upvotes: 5

Related Questions