Nate
Nate

Reputation: 7876

Subscribe a forkJoin returns an error message

I'm trying yo detect when all my observable returned their value. Here is how I do it after importing import { Observable } from 'rxjs/Rx';:

    let observables:any[] = [];

    observables.push(this.getValue1().subscribe((value1)=> {
        // Do something with value1
    }));
    observables.push(this.getValue2().subscribe((value2)=> {
        // Do something with value1
    }));

    Observable.forkJoin(observables).subscribe(
        data => {
            console.log('initialized');
        },
        err => console.error(err)
    );

my values are returned through observer.next(valueX);

I get the following error message: TypeError: source.subscribe is not a function. What am I doing wrong?

Edit: Here is how I create my getValues()

getValue1(): any {
    return new Observable(observer => {
        this.asyncFunc(arg1, (err, value1)=> {
            observer.next(value1);
        }
    }
}

Upvotes: 0

Views: 986

Answers (1)

paulpdaniels
paulpdaniels

Reputation: 18663

You are not passing Observables to forkJoin you are passing Subscriptions. As soon as you call subscribe on an Observable you are telling it you are finished building the pipeline and are ready to start processing, so instead of another Observable it returns a cancellation primitive (read: Subscription) instead.

You only need the subscribe block once at the end. If you need to do intermediate processing on the values in the forkJoin note that it takes an optional selector argument as well.

let observables:any[] = [];

observables.push(this.getValue1());
observables.push(this.getValue2());

Observable.forkJoin(observables, ([value1, value2]) => {
  //Do something with value1 and value2
  return [value1, value2];
}).subscribe(
    data => {
        console.log('initialized');
    },
    err => console.error(err)
);

Edit

Now since forkJoin only joins the Observables when they complete. You will need to complete the sources (and likely handle any errors as well). Hence getValue1 should become:

getValue1(): any {
    return new Observable(observer => {
        this.asyncFunc(arg1, (err, value1)=> {
            if (err)
              return observer.error(err);

            observer.next(value1);
            observer.complete();
        }
    }
}

Note however that there is already an operator available for this called bindNodeCallback

Which would allow you to simply write:

this.getValue1 = Observable.bindNodeCallback(this.asyncFunc);
this.getValue2 = Observable.bindNodeCallback(this.asyncFunc);

letting you call these functions in the same way:

let observables:any[] = [this.getValue1(), this.getValue2()];

As for the second question of how to be able to update on completion. If you use bindNodeCallback you can use a secondary stream to update your progress bar.

//Lift the length into the stream
Observable.of(observables.length)
          //Create an Observable that emits any time one of the source
          //streams emits
          .flatMap(len => Observable.merge(observables),
           //Compute the percentage completed
                   (len, _, inner, outer) => outer / len * 100)
           //Do your updating
          .subscribe(percentCompleted => /*Update progress bar*/);

If you choose to not use bindNodeCallback you can do the same thing but you will need to do a bit more leg work because each of your streams is cold, meaning that each subscribe results in a new call to asyncFunc (bindNodeCallback goes hot when called and caches the returned value for new subscribers)

//Turns these in to ConnectableObservables
let observables:any[] = [this.getValue1().publishLast(), this.getValue2().publishLast()];

//Set up the other streams
Observable.of(...);

Observable.forkJoin(...);

//Start your Observables
observables.forEach(c => c.connect());

Upvotes: 1

Related Questions