flak37
flak37

Reputation: 895

RxJS zip not working while forkJoin does

what I'm trying to achieve is this (with Angular 2/Typescript):

Code: (full code at https://plnkr.co/edit/44yqw0RYzC7v1TFACMx1)

let source = Observable
.interval(5000)
.take(100);

let requests = [];

for(let i=0; i<8;i++) {
  let request = source.switchMap(x=> http.get('https://jsonplaceholder.typicode.com/users/'+(i+1))).publish();
  request.subscribe(res => console.log(res.json()));
  requests.push(request);
}

Observable.zip(requests)
.subscribe(console.log("All requests completed"));

requests.forEach(r => r.connect());

The problem is my zip never gets called. I console.log'ged the subscription to each of the 8 switchmaps and I'm getting logs showing eight http calls return successfully each time there is an event in Observable/stream A. (also can see the 8 calls returning in the network tab of the debug tools)

But zip never emits anything.


If I try a different (less ideal) approach:

Code: (full code at https://plnkr.co/edit/GqQde1Ae2licBjtL0jcj)

let source = Observable
.interval(5000)
.take(100);

 source.subscribe(x=> {
   console.log(x);
   let requests = [];

   for(let i=0; i<8;i++) {
     let request = http.get('https://jsonplaceholder.typicode.com/users/'+(i+1)).publish();
     request.subscribe(res => console.log(res.json()));
     requests.push(request);
   }

   Observable.forkJoin(requests)
   .subscribe(console.log("All requests completed"));

   requests.forEach(r => r.connect());

 });

This works. But with the obvious pitfall that I'm creating 8+1 nested observables/subscriptions each time Observable A emits.

(In both cases I'm using publish/connect to share/reuse subscriptions, but the problem exists even without it)

Upvotes: 2

Views: 7278

Answers (1)

Yury Tarabanko
Yury Tarabanko

Reputation: 45106

You first example would work if you call zip correctly with multiple arguments and pass a function to subscribe (not the result of console.log which is undefined). Demo.

Observable.zip(...requests) // <-- spread this 
    .subscribe(() => console.log("All requests completed")); // <-- pass a function

requests.forEach(r => r.connect());

Upvotes: 9

Related Questions