Fantasia
Fantasia

Reputation: 77

RXJS - merge two stream results into one sorted array

I have two streams. Let's say:

const firstStream = Rx.of([
  {
    first: 'first',
  }, {
    third: 'third',

  }
]);

const secondStream = Rx.of([
  {
    second: 'second'
  }, {
    fourth: 'fourth'
  }
]);

Now I want a stream that combines the result of these two streams and maps the result array sorted as follows:

const resultArr = [
  {
    first: 'first',
  }, 
  {
    second: 'second'
  },
  {
    third: 'third',
  },
  {
    fourth: 'fourth'
  }
];

I tried to use combineLatest with RxJS flatmap operator, but that did not work out. I provided a stackblitz playground to test around: StackBlitz

I'm sure there are plenty of ways to do this. Maybe someone can help me out :)

Upvotes: 2

Views: 2726

Answers (2)

enter code hereAs you said your streams first complete and after that, you need the sorted value as single output of the stream, so I would recommend the forkJoin operator, which operator will Wait for Observables to complete and then combine last values they emitted.

const { of, forkJoin } = rxjs;
const { map } = rxjs.operators;

let a$ = of([1, 8, 10, 4]);
let b$ = of([3, 5, 43, 0]);

forkJoin(a$, b$)
  .pipe(
    map(([a, b]) => [...a, ...b]),
    map(x => x.sort((a, b) => a - b))
  )
  .subscribe(x => {
    console.log('Sorted =>', x);
  });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js" integrity="sha256-mNXCdYv896VtdKYTBWgurbyH+p9uDUgWE4sYjRnB5dM=" crossorigin="anonymous"></script>

Upvotes: 1

Hitmands
Hitmands

Reputation: 14189

const { from, merge } = rxjs;
const { reduce, map, mergeMap } = rxjs.operators

const a = from(['first', 'third']);
const b = from(['second', 'fourth']);


const sortMap = {
  first: 0,
  second: 1,
  third: 2,
  fourth: 4,
}

merge(a, b).pipe(
  // wait until every observable has completed,
  // zip all the values into an array
  reduce((res, item) => res.concat(item), []),
  
  // sort the array accordingly to your needs
  map(list => list.sort((a, b) => sortMap[a] - sortMap[b])),
  
  // flatten the array into a sequence
  mergeMap(list => list),
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.2/rxjs.umd.js" integrity="sha256-mNXCdYv896VtdKYTBWgurbyH+p9uDUgWE4sYjRnB5dM=" crossorigin="anonymous"></script>

Upvotes: 4

Related Questions