Reputation: 2897
I'm creating an array of asynchronous observables with Rx.Observable.create()
and hope to use .toArray()
to get all the values when they complete.
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
Above example at http://jsbin.com/wegoha/10/edit?js,console.
Using setTimeout
as a stand-in for other asynchronous operations to keep the example simple.
Upvotes: 8
Views: 9118
Reputation: 117064
The code is correct except you didn't complete the source observables.
The toArray()
operator can only work when the observable completes, and since you didn't complete the Rx.Observable.create
then your query could never end.
Try this:
console.log('running');
let valsArray = ['a','b','c'].map((val,i)=>{
return Rx.Observable.create((obs)=>{
let tid = setTimeout(()=>{
console.log(val + ' timing out');
obs.onNext(val);
obs.onCompleted();
},i*500);
return ()=>{
clearTimeout(tid);
};
}).publish().refCount();
});
Rx.Observable.from(valsArray)
.flatMap((v)=>v)
.toArray()
.subscribe((arr)=>{
console.log("arr should be ['a','b','c']",arr);
});
Also, just as a side-note, the .publish().refCount()
seems wrong here. There's no need in this code to make the source observables hot.
Upvotes: 11