Reputation: 895
what I'm trying to achieve is this (with Angular 2/Typescript):
Observable A produces stream of events.
For each event of Observable A, make 8 different http calls. (8 switchmaps)
After all of the 8 requests return, do something (subscribe to zip of 8 switchmaps).
Repeat 8 requests for each event of Observable A (taken care of by switchmap and zip)
Code: (full code at https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1)
let source = Observable
.interval(5000)
.take(100);
let requests = [];
for(let i=0; i<8;i++) {
let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
request.subscribe(res => console.log(res.json()));
requests.push(request);
}
Observable.zip(requests)
.subscribe(console.log("All requests completed"));
requests.forEach(r => r.connect());
The problem is my zip never gets called. I console.log'ged the subscription to each of the 8 switchmaps and I'm getting logs showing eight http calls return successfully each time there is an event in Observable/stream A. (also can see the 8 calls returning in the network tab of the debug tools)
But zip never emits anything.
If I try a different (less ideal) approach:
Code: (full code at https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj)
let source = Observable
.interval(5000)
.take(100);
source.subscribe(x=> {
console.log(x);
let requests = [];
for(let i=0; i<8;i++) {
let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
request.subscribe(res => console.log(res.json()));
requests.push(request);
}
Observable.forkJoin(requests)
.subscribe(console.log("All requests completed"));
requests.forEach(r => r.connect());
});
This works. But with the obvious pitfall that I'm creating 8+1 nested observables/subscriptions each time Observable A emits.
(In both cases I'm using publish/connect to share/reuse subscriptions, but the problem exists even without it)
Upvotes: 2
Views: 7278
Reputation: 45106
You first example would work if you call zip
correctly with multiple arguments and pass a function to subscribe (not the result of console.log which is undefined). Demo.
Observable.zip(...requests) // <-- spread this
.subscribe(() => console.log("All requests completed")); // <-- pass a function
requests.forEach(r => r.connect());
Upvotes: 9