Reputation: 7876
I'm trying yo detect when all my observable returned their value. Here is how I do it after importing import { Observable } from 'rxjs/Rx';
:
let observables:any[] = [];
observables.push(this.getValue1().subscribe((value1)=> {
// Do something with value1
}));
observables.push(this.getValue2().subscribe((value2)=> {
// Do something with value1
}));
Observable.forkJoin(observables).subscribe(
data => {
console.log('initialized');
},
err => console.error(err)
);
my values are returned through observer.next(valueX);
I get the following error message: TypeError: source.subscribe is not a function
. What am I doing wrong?
Edit: Here is how I create my getValues()
getValue1(): any {
return new Observable(observer => {
this.asyncFunc(arg1, (err, value1)=> {
observer.next(value1);
}
}
}
Upvotes: 0
Views: 986
Reputation: 18663
You are not passing Observables
to forkJoin
you are passing Subscriptions
. As soon as you call subscribe
on an Observable
you are telling it you are finished building the pipeline and are ready to start processing, so instead of another Observable
it returns a cancellation primitive (read: Subscription) instead.
You only need the subscribe
block once at the end. If you need to do intermediate processing on the values in the forkJoin
note that it takes an optional selector argument as well.
let observables:any[] = [];
observables.push(this.getValue1());
observables.push(this.getValue2());
Observable.forkJoin(observables, ([value1, value2]) => {
//Do something with value1 and value2
return [value1, value2];
}).subscribe(
data => {
console.log('initialized');
},
err => console.error(err)
);
Edit
Now since forkJoin
only joins the Observables
when they complete. You will need to complete the sources (and likely handle any errors as well). Hence getValue1
should become:
getValue1(): any {
return new Observable(observer => {
this.asyncFunc(arg1, (err, value1)=> {
if (err)
return observer.error(err);
observer.next(value1);
observer.complete();
}
}
}
Note however that there is already an operator available for this called bindNodeCallback
Which would allow you to simply write:
this.getValue1 = Observable.bindNodeCallback(this.asyncFunc);
this.getValue2 = Observable.bindNodeCallback(this.asyncFunc);
letting you call these functions in the same way:
let observables:any[] = [this.getValue1(), this.getValue2()];
As for the second question of how to be able to update on completion. If you use bindNodeCallback
you can use a secondary stream to update your progress bar.
//Lift the length into the stream
Observable.of(observables.length)
//Create an Observable that emits any time one of the source
//streams emits
.flatMap(len => Observable.merge(observables),
//Compute the percentage completed
(len, _, inner, outer) => outer / len * 100)
//Do your updating
.subscribe(percentCompleted => /*Update progress bar*/);
If you choose to not use bindNodeCallback
you can do the same thing but you will need to do a bit more leg work because each of your streams is cold, meaning that each subscribe
results in a new call to asyncFunc
(bindNodeCallback
goes hot when called and caches the returned value for new subscribers)
//Turns these in to ConnectableObservables
let observables:any[] = [this.getValue1().publishLast(), this.getValue2().publishLast()];
//Set up the other streams
Observable.of(...);
Observable.forkJoin(...);
//Start your Observables
observables.forEach(c => c.connect());
Upvotes: 1