Robert P
Robert P

Reputation: 9783

RxJs Array of Observable to Observable of Array

For a Web-Application written with Angular2 in TypeScript, I need to work with RxJs Observables.
As I never used rxjs before and I am new to reactiveprogramming in general, I sometimes have some difficulties finding the right way to do some specific things.
I am now facing a problem. where I have to convert an Array<Observable<T>> into an Observable<Array<T>>.
I'll try to explain it with an example:

I can easily map from Observable<Array<User>> to Observable<Array<Observable<Array<Post>>>> using
map((result : Array<User>) => result.map((user : User) => user.getPosts()))
and I can flatten an Array<Array<Post>> into an Array<Post>.
However I just can't find the correct way to map the Observable<Array<Observable<Array<Post>>>> into an Observable<Array<Array<Post>>>
Until now i used the combineLatest function together with flatMap.
To me it seemed to work and the editor i used (Atom editor) did not show any error. However now I use Netbeans, which shows me an error in this code. Also compiling the code using "tsc" results in the following errors:

Argument of type '(result: Post[]) => void' is not assignable to parameter of type 'NextObserver<[Observable<Post>]> | ErrorObserver<[Observable<Post>]> | CompletionObserver<[...'.
Type '(result: Post[]) => void' is not assignable to type '(value: [Observable<Post>]) => void'.  

So my question is:
How can I "flatten" an Array of Observables into an Array?

Upvotes: 30

Views: 75925

Answers (5)

FindOutIslamNow
FindOutIslamNow

Reputation: 1236

Using toArray()

range(0, 7).pipe(
  concatMap(v => of(`day_${v}`)),
  toArray() // here
).subscribe(a => {
  console.log(a); // ['day_0', 'day_1', ... 'day_6']
});

Upvotes: 0

Paul John Leonard
Paul John Leonard

Reputation: 438

Here is some example code using [email protected]

import { of, forkJoin} from 'rxjs';
import { Observable } from 'rxjs'

const source:Array<Observable<String>> = [of('A'), of('B'), of('C') ];

forkJoin(source).subscribe( (x)=> console.log(x))

https://arrayofobservablesexamp.stackblitz.io

Upvotes: 5

Use forkJoin on this array of observables, then it will be an observable of array.

Upvotes: 4

Lievno
Lievno

Reputation: 1041

You can use the function apply on the rxjs method you want, like this:

const source1 = Rx.Observable.interval(100)
  .map(function (i) { return 'First: ' + i; });

const source2 = Rx.Observable.interval(150)
  .map(function (i) { return 'Second: ' + i; });

const observablesArray = [source1, source2];

const sources = Rx.Observable.combineLatest
.apply(this, observablesArray).take(4)

/* or you can write it without "apply" this way:
const sources = Rx.Observable
                .combineLatest(...observablesArray)
                .take(4)
*/
sources.subscribe(
  (response) => {
    console.log(response);
  }
)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>

Upvotes: 10

Thierry Templier
Thierry Templier

Reputation: 202138

The flatMap operator allows to do that. I don't fully understand what you try to do but I'll try to provide an answer...

If you want load all the

getPostsPerUser() {
  return this.http.get('/users')
    .map(res => res.json())
    .flatMap((result : Array<User>) => {
      return Observable.forkJoin(
        result.map((user : User) => user.getPosts());
    });
}

Observable.forkJoin allows you to wait for all observables to have received data.

The code above assumes that user.getPosts() returns an observable...

With this, you will receive an array of array of posts:

this.getPostsPerUser().subscribe(result => {
  var postsUser1 = result[0];
  var postsUser2 = result[1];
  (...)
});

Upvotes: 41

Related Questions