Adam
Adam

Reputation: 2897

RxJS - How to use toArray() with an array of asynchronous observables?

I'm creating an array of asynchronous observables with Rx.Observable.create() and hope to use .toArray() to get all the values when they complete.

console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
  return Rx.Observable.create((obs)=>{
    let tid = setTimeout(()=>{
      console.log(val + ' timing out');
      obs.onNext(val);
    },i*500);
    return ()=>{
      clearTimeout(tid);
    };
  }).publish().refCount();
});

Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
  console.log("arr should be ['a','b','c']",arr);
});

Above example at http://jsbin.com/wegoha/10/edit?js,console.

Using setTimeout as a stand-in for other asynchronous operations to keep the example simple.

Upvotes: 8

Views: 9118

Answers (1)

Enigmativity
Enigmativity

Reputation: 117064

The code is correct except you didn't complete the source observables.

The toArray() operator can only work when the observable completes, and since you didn't complete the Rx.Observable.create then your query could never end.

Try this:

console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
  return Rx.Observable.create((obs)=>{
    let tid = setTimeout(()=>{
      console.log(val + ' timing out');
      obs.onNext(val);
      obs.onCompleted();
    },i*500);
    return ()=>{
      clearTimeout(tid);
    };
  }).publish().refCount();
});

Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
  console.log("arr should be ['a','b','c']",arr);
});

Also, just as a side-note, the .publish().refCount() seems wrong here. There's no need in this code to make the source observables hot.

Upvotes: 11

Related Questions