John Tiggernaught
John Tiggernaught

Reputation: 788

MergeMap from Array of Observables

TLDR: Working example is in the last codeblock of this question. Check out @bryan60 answer for a working example using concat rather than mergeMap.


I'm trying to run a number of remote requests sequentially, but only the first observable is executed.

The number of request vary, so I can't do a dodgy solution where I nest observables within each other.

I'm using the following code:

const observables = [
  observable1,
  observable2,
  ...
];

from(observables).pipe(
  mergeMap(ob=> {
    return ob.pipe(map(res => res));
  }, undefined, 1)
).subscribe(res => {
  console.log('Huzzah!');
})

In the past (rxjs 5.5) Ive used the following:

let o = Observable.from(observables).mergeMap((ob) => {
  return ob;
}, null, 1);

o.subscribe(res => {
  console.log('Huzzah!');
})

I'm not sure what I'm doing wrong, can anybody shed some light?

An additional request would be to only print 'Huzzah!' once on completion of all requests rather than for each individual Observable.

EDIT:

Removing undefined from my original code will make it work, however there was another issue causing only the first observable to be executed.

I'm using Angular's HttpClient for remote requests. My observable code looked like this:

const observables = [];

// Only the first observable would be executed
observables.push(this.http.get(urla));
observables.push(this.http.get(urlb));
observables.push(this.http.get(urlc));

Adding .pipe(take(1)) to each observable results in each observable being executed:

const observables = [];

// All observables will now be executed
observables.push(this.http.get(urla).pipe(take(1));
observables.push(this.http.get(urlb).pipe(take(1));
observables.push(this.http.get(urlc).pipe(take(1));

The code I ended up using, which executes all observables in sequential order and only triggers Huzzah! once is:

const observables = [];

observables.push(this.http.get(urla).pipe(take(1));
observables.push(this.http.get(urlb).pipe(take(1));
observables.push(this.http.get(urlc).pipe(take(1));

from(observables).pipe(
  mergeMap(ob=> {
    return ob.pipe(map(res => res));
  }, 1),
  reduce((all: any, res: any) => all.concat(res), [])
).subscribe(res => {
  console.log('Huzzah!');
})

Thanks to @bryan60 for helping me wit this issue.

Upvotes: 3

Views: 6532

Answers (2)

bryan60
bryan60

Reputation: 29335

if these are http requests that complete, I think your bug is caused by a change to the mergeMap signature that removed the result selector. it's hard to be sure without knowing exactly which version you're on as it was there, then removed, then added again, and they're removing it once more for good in v7.

if you want to run them sequentially... this is all you need...

// concat runs input observables sequentially
concat(...observables).subscribe(res => console.log(res))

if you want to wait till they're all done to emit, do this:

concat(...observables).pipe(
  // this will gather all responses and emit them all when they're done
  reduce((all, res) => all.concat([res]), [])
  // if you don't care about the responses, just use last()
).subscribe(allRes => console.log(allRes))

In my personal utility rxjs lib, I always include a concatJoin operator that combines concat and reduce like this.

the only trick is that concat requires observables to complete till it moves on to the next one, but the same is true for mergeMap with concurrent subscriptions set to 1.. so that should be fine. things like http requests are fine, as they complete naturally after one emission.. websockets or subjects or event emitters will behave a bit differently and have to be manually completed, either with operators like first or take or at the source.

Upvotes: 3

Shijil Narayanan
Shijil Narayanan

Reputation: 1019

If you are not concerned about the sequence of execution and just want 'Huzzah!' to be printed once all the observable has been executed forkJoin can also be used.Try this.

forkJoin(...observables).subscribe(res => console.log('Huzzah');

Upvotes: 1

Related Questions